[ 
https://issues.apache.org/jira/browse/BEAM-3726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383882#comment-16383882
 ] 

Pawel Bartoszek edited comment on BEAM-3726 at 3/2/18 6:06 PM:
---------------------------------------------------------------

Another thing I noticed (I might be completely wrong here) is that 
UnboundedSource.getWatermark() and UnboundedSource.advance() methods are called 
by two threads. 

However, I don't see any synchronisation code in KinesisReader that would 
prevent concurrent access to not thread safe MovingFunction class. Both 
MovingFunction.add(...) and MovingFunction.get(...) calls a private flush 
methods that modifies internal buckets in MovingFunction.

 

I could imagine the following race condition scenario:

 

Both MovingFunction.add(...) and MovingFunction.get(...) are called roughly the 
same time

and they both execute 

[https://github.com/apache/beam/blob/90e9ee1e7206f3d4da6c317ddf411121889a13c3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L91]

 

and both threads get the same value assigned to newBuckets variable.

[https://github.com/apache/beam/blob/90e9ee1e7206f3d4da6c317ddf411121889a13c3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L98]

 

In this case the loop will execute newBuckets times in *each thread* which will 
cause the 

currentMsSinceEpoch to be 2*newBuckets*sampleUpdateMs and the next time 
MovingFunction.add(...) or MovingFunction.get(...) is executed the 

condition will fail 

[https://github.com/apache/beam/blob/90e9ee1e7206f3d4da6c317ddf411121889a13c3/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]

 

 

 

 

 

 

 

 


was (Author: pawelbartoszek):
Another thing I noticed (I might be completely wrong here) is that 
UnboundedSource.getWatermark() and UnboundedSource.advance() methods are called 
by two threads. 

However, I don't see any synchronisation code in KinesisReader that would 
prevent concurrent access to not thread safe MovingFunction class. Both 
MovingFunction.add(...) and MovingFunction.get(...) calls a private flush 
methods that modifies internal buckets in MovingFunction.

 

> Kinesis Reader: java.lang.IllegalArgumentException: Attempting to move 
> backwards
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-3726
>                 URL: https://issues.apache.org/jira/browse/BEAM-3726
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Jean-Baptiste Onofré
>            Priority: Major
>         Attachments: KinesisIO-state.txt
>
>
> When the job is restored from savepoint Kinesis Reader throws almost always 
> {{java.lang.IllegalArgumentException: Attempting to move backwards}}
> After a few job restarts caused again by the same exception, job finally 
> starts up and continues to run with no further problems.
> Beam job is reading from 32 shards with parallelism set to 32. Using Flink 
> 1.3.2. But I have seen this exception also when using Beam 2.2 when Kinesis 
> client was refactored to use MovingFunction. I think this is a serious 
> regression bug introduced in Beam 2.2. 
>  
> {code:java}
> java.lang.IllegalArgumentException: Attempting to move backwards
> at 
> org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at org.apache.beam.sdk.util.MovingFunction.flush(MovingFunction.java:97)
> at org.apache.beam.sdk.util.MovingFunction.add(MovingFunction.java:114)
> at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.advance(KinesisReader.java:137)
> at 
> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:67)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:264)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702){code}
>  
> Kinesis Reader transformation configuration:
> {code:java}
> pipeline.apply("KINESIS READER", KinesisIO.read()
> .withStreamName(streamName)
> .withInitialPositionInStream(InitialPositionInStream.LATEST)
> .withAWSClientsProvider(awsAccessKey, awsSecretKey, EU_WEST_1)){code}
>  
> When testing locally I managed to catch this exception. Just before executing 
> this 
> [link|https://github.com/apache/beam/blob/6c93105c2cb7be709c6b3e2e6cdcd09df2b48308/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java#L97]
>  that threw exception I captured the state of the class so that you can 
> replicate the issue
> {code:java}
> org.apache.beam.sdk.util.MovingFunction@71781a[sampleUpdateMs=5000,numSignificantBuckets=2,numSignificantSamples=10,function=org.apache.beam.sdk.transforms.Min$MinLongFn@7909d8d3,buckets={9223372036854775807,9223372036854775807,1519315344334,1519315343759,1519315343770,1519315344086,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807,9223372036854775807},numSamples={0,0,1,158,156,146,0,0,0,0,144,0},currentMsSinceEpoch=1519315585000,currentIndex=2]{code}
>  
> the add function of MovingFunction was called with nowMsSinceEpoch = 
> 1519315583591
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to