Pawel Bartoszek commented on BEAM-3726:

[~iemejia]  I have looked into PubSubIO implementation and can see that it's 
far more advanced when it comes to getWarermark method in PubSubUnboundedSource 
class.Maybe comments would shed some light?

public Instant getWatermark() {
if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) {
// For testing only: Advance the watermark to the end of time to signal
// the test is complete.
return BoundedWindow.TIMESTAMP_MAX_VALUE;

// NOTE: We'll allow the watermark to go backwards. The underlying runner is 
// for aggregating all reported watermarks and ensuring the aggregate is 
// If we attempt to latch locally then it is possible a temporary starvation of 
one reader
// could cause its estimated watermark to fast forward to current system time. 
Then when
// the reader resumes its watermark would be unable to resume tracking.
// By letting the underlying runner latch we avoid any problems due to 
localized starvation.
long nowMsSinceEpoch = now();
long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
long unreadMin = minUnreadTimestampMsSinceEpoch.get();
if (readMin == Long.MAX_VALUE
&& unreadMin == Long.MAX_VALUE
&& lastReceivedMsSinceEpoch >= 0
&& nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
// We don't currently have any unread messages pending, we have not had any 
// read for a while, and we have not received any new messages from Pubsub for 
a while.
// Advance watermark to current time.
// TODO: Estimate a timestamp lag.
lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
} else if (minReadTimestampMsSinceEpoch.isSignificant()
|| minUnreadTimestampMsSinceEpoch.isSignificant()) {
// Take minimum of the timestamps in all unread messages and recently read 
lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
// else: We're not confident enough to estimate a new watermark. Stick with the 
old one.
minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch);
return new Instant(lastWatermarkMsSinceEpoch);
 * Return {@literal true} if a Pubsub messaage is available, {@literal false} if
 * none is available at this time or we are over-subscribed. May BLOCK while 
 * ACKs or fetching available messages. Will not block waiting for messages.
public boolean advance() throws IOException {
  // Emit stats.

  if (current != null) {
    // Current is consumed. It can no longer contribute to holding back the 
    current = null;

  // Retire state associated with ACKed messages.

  // Extend all pressing deadlines.
  // Will BLOCK until done.
  // If the system is pulling messages only to let them sit in a downsteam 
queue then
  // this will have the effect of slowing down the pull rate.
  // However, if the system is genuinely taking longer to process each message 
  // the work to extend ACKs would be better done in the background.

  if (notYetRead.isEmpty()) {
    // Pull another batch.
    // Will BLOCK until fetch returns, but will not block until a message is 

  // Take one message from queue.
  current = notYetRead.poll();
  if (current == null) {
    // Try again later.
    return false;
  notYetReadBytes -= current.elementBytes.length;
  checkState(notYetReadBytes >= 0);
  long nowMsSinceEpoch = now();
  numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length);
  if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) {
    numLateMessages.add(nowMsSinceEpoch, 1L);

  // Current message can be considered 'read' and will be persisted by the next
  // checkpoint. So it is now safe to ACK back to Pubsub.
  return true;

> Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move 
> backwards
> --------------------------------------------------------------------------------
>                 Key: BEAM-3726
>                 URL: https://issues.apache.org/jira/browse/BEAM-3726
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
> When the job is restored from savepoint Kinesis Reader throws almost always 
> {{java.lang.IllegalArgumentException: Attempting to move backwards}}
> After a few job restarts caused again by the same exception, job finally 
> starts up and continues to run with no further problems.
> Beam job is reading from 32 shards with parallelism set to 32. Using Flink 
> 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis 
> client was refactored to use MovingFunction. I think this is a serious 
> regression bug introduced in Beam 2.2. 
> {code:java}
> java.lang.IllegalArgumentException: Attempting to move backwards
> at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
> at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
> at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
> at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
> Kinesis Reader transformation configuration:
> {code:java}
> pipeline.apply("KINESIS READER", KinesisIO.read()
> .withStreamName(streamName)
> .withInitialPositionInStream(InitialPositionInStream.LATEST)
> .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
> When testing locally I managed to catch this exception. Just before executing 
> this 
> [link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
>  that threw exception I captured the state of the class so that you can 
> replicate the issue
> {code:java}
> org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
> the add function of MovingFunction was called with nowMsSinceEpoch = 
> 1519315583591

This message was sent by Atlassian JIRA

Reply via email to