[ 
https://issues.apache.org/jira/browse/BEAM-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-3726:
----------------------------------
    Description: 
When the job is restored from savepoint Kinesis Reader throws almost always 
{{java.lang.IllegalArgumentException: Attempting to move backwards}}

After a few job restarts caused again by the same exception, job finally starts 
up and continues to run with no further problems.

Beam job is reading from 32 shards with parallelism set to 32. Using Flink 
1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis 
client was refactored to use MovingFunction. I think this is a serious 
regression bug introduced in Beam 2.2. 

 
{code:java}
java.lang.IllegalArgumentException: Attempting to move backwards
at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
 

Kinesis Reader transformation configuration:
{code:java}
pipeline.apply("KINESIS READER", KinesisIO.read()
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
 

When testing locally I managed to catch this exception. Just before executing 
this 
[link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
 that threw exception I captured the state of the class so that you can 
replicate the issue
{code:java}
org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
 

the add function of MovingFunction was called with nowMsSinceEpoch = 
1519315583591

 

 

  was:
When the job is restored from savepoint Kinesis Reader throws almost always 
{{java.lang.IllegalArgumentException: Attempting to move backwards}}

After a few job restarts caused again by the same exception, job finally starts 
up and continues to run with no further problems.

Beam job is reading from 32 shards with parallelism set to 32. Using Flink 1.3.2

 
{code:java}
java.lang.IllegalArgumentException: Attempting to move backwards
at 
org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
at org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
 

Kinesis Reader transformation configuration:
{code:java}
pipeline.apply("KINESIS READER", KinesisIO.read()
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
 

When testing locally I managed to catch this exception. Just before executing 
this 
[link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
 that threw exception I captured the state of the class so that you can 
replicate the issue
{code:java}
org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
 

the add function of MovingFunction was called with nowMsSinceEpoch = 
1519315583591

 


> Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move 
> backwards
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-3726
>                 URL: https://issues.apache.org/jira/browse/BEAM-3726
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>
> When the job is restored from savepoint Kinesis Reader throws almost always 
> {{java.lang.IllegalArgumentException: Attempting to move backwards}}
> After a few job restarts caused again by the same exception, job finally 
> starts up and continues to run with no further problems.
> Beam job is reading from 32 shards with parallelism set to 32. Using Flink 
> 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis 
> client was refactored to use MovingFunction. I think this is a serious 
> regression bug introduced in Beam 2.2. 
>  
> {code:java}
> java.lang.IllegalArgumentException: Attempting to move backwards
> at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
> at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
> at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
> at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
>  
> Kinesis Reader transformation configuration:
> {code:java}
> pipeline.apply("KINESIS READER", KinesisIO.read()
> .withStreamName(streamName)
> .withInitialPositionInStream(InitialPositionInStream.LATEST)
> .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
>  
> When testing locally I managed to catch this exception. Just before executing 
> this 
> [link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
>  that threw exception I captured the state of the class so that you can 
> replicate the issue
> {code:java}
> org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
>  
> the add function of MovingFunction was called with nowMsSinceEpoch = 
> 1519315583591
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to