[ https://issues.apache.org/jira/browse/BEAM-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383736#comment-16383736 ]
Pawel Bartoszek commented on BEAM-3726: --------------------------------------- [~iemejia] I have looked into PubSubIO implementation and can see that it's far more advanced when it comes to getWarermark method in PubSubUnboundedSource class.Maybe comments would shed some light? {code:java} @Override public Instant getWatermark() { if (pubsubClient.get().isEOF() && notYetRead.isEmpty()) { // For testing only: Advance the watermark to the end of time to signal // the test is complete. return BoundedWindow.TIMESTAMP_MAX_VALUE; } // NOTE: We'll allow the watermark to go backwards. The underlying runner is responsible // for aggregating all reported watermarks and ensuring the aggregate is latched. // If we attempt to latch locally then it is possible a temporary starvation of one reader // could cause its estimated watermark to fast forward to current system time. Then when // the reader resumes its watermark would be unable to resume tracking. // By letting the underlying runner latch we avoid any problems due to localized starvation. long nowMsSinceEpoch = now(); long readMin = minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch); long unreadMin = minUnreadTimestampMsSinceEpoch.get(); if (readMin == Long.MAX_VALUE && unreadMin == Long.MAX_VALUE && lastReceivedMsSinceEpoch >= 0 && nowMsSinceEpoch > lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) { // We don't currently have any unread messages pending, we have not had any messages // read for a while, and we have not received any new messages from Pubsub for a while. // Advance watermark to current time. // TODO: Estimate a timestamp lag. lastWatermarkMsSinceEpoch = nowMsSinceEpoch; } else if (minReadTimestampMsSinceEpoch.isSignificant() || minUnreadTimestampMsSinceEpoch.isSignificant()) { // Take minimum of the timestamps in all unread messages and recently read messages. lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin); } // else: We're not confident enough to estimate a new watermark. Stick with the old one. minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, lastWatermarkMsSinceEpoch); return new Instant(lastWatermarkMsSinceEpoch); }{code} {code:java} /** * BLOCKING * Return {@literal true} if a Pubsub messaage is available, {@literal false} if * none is available at this time or we are over-subscribed. May BLOCK while extending * ACKs or fetching available messages. Will not block waiting for messages. */ @Override public boolean advance() throws IOException { // Emit stats. stats(); if (current != null) { // Current is consumed. It can no longer contribute to holding back the watermark. minUnreadTimestampMsSinceEpoch.remove(current.requestTimeMsSinceEpoch); current = null; } // Retire state associated with ACKed messages. retire(); // Extend all pressing deadlines. // Will BLOCK until done. // If the system is pulling messages only to let them sit in a downsteam queue then // this will have the effect of slowing down the pull rate. // However, if the system is genuinely taking longer to process each message then // the work to extend ACKs would be better done in the background. extend(); if (notYetRead.isEmpty()) { // Pull another batch. // Will BLOCK until fetch returns, but will not block until a message is available. pull(); } // Take one message from queue. current = notYetRead.poll(); if (current == null) { // Try again later. return false; } notYetReadBytes -= current.elementBytes.length; checkState(notYetReadBytes >= 0); long nowMsSinceEpoch = now(); numReadBytes.add(nowMsSinceEpoch, current.elementBytes.length); minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, current.timestampMsSinceEpoch); if (current.timestampMsSinceEpoch < lastWatermarkMsSinceEpoch) { numLateMessages.add(nowMsSinceEpoch, 1L); } // Current message can be considered 'read' and will be persisted by the next // checkpoint. So it is now safe to ACK back to Pubsub. safeToAckIds.add(current.ackId); return true; }{code} > Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move > backwards > -------------------------------------------------------------------------------- > > Key: BEAM-3726 > URL: https://issues.apache.org/jira/browse/BEAM-3726 > Project: Beam > Issue Type: Improvement > Components: io-java-kinesis > Affects Versions: 2.2.0 > Reporter: Pawel Bartoszek > Assignee: Jean-Baptiste Onofré > Priority: Major > > When the job is restored from savepoint Kinesis Reader throws almost always > {{java.lang.IllegalArgumentException: Attempting to move backwards}} > After a few job restarts caused again by the same exception, job finally > starts up and continues to run with no further problems. > Beam job is reading from 32 shards with parallelism set to 32. Using Flink > 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis > client was refactored to use MovingFunction. I think this is a serious > regression bug introduced in Beam 2.2. > > {code:java} > java.lang.IllegalArgumentException: Attempting to move backwards > at > org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) > at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97) > at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114) > at > org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137) > at > org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) > at > org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code} > > Kinesis Reader transformation configuration: > {code:java} > pipeline.apply("KINESIS READER", KinesisIO.read() > .withStreamName(streamName) > .withInitialPositionInStream(InitialPositionInStream.LATEST) > .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code} > > When testing locally I managed to catch this exception. Just before executing > this > [link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97] > that threw exception I captured the state of the class so that you can > replicate the issue > {code:java} > org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code} > > the add function of MovingFunction was called with nowMsSinceEpoch = > 1519315583591 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)