[ 
https://issues.apache.org/jira/browse/BEAM-2752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364077#comment-16364077
 ] 

Ismaël Mejía commented on BEAM-2752:
------------------------------------

[~pawelbartoszek] there have been a lot of changes in the last weeks on 
KinesisIO, can you somehow try the master version (or SNAPSHOT) and tell us if 
it is still happening.

> KinesisIO throws ConcurrentModificationException on checkpoint
> --------------------------------------------------------------
>
>                 Key: BEAM-2752
>                 URL: https://issues.apache.org/jira/browse/BEAM-2752
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>    Affects Versions: 2.0.0, 2.1.0, 2.2.0
>            Reporter: Pawel Bartoszek
>            Assignee: Chamikara Jayalath
>            Priority: Minor
>
> From time to time Kinesis IO throws ConcurrentModificationException on taking 
> a checkpoint.
> {code:java}
> Caused by: java.util.ConcurrentModificationException
>       at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:643)
>       at 
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>       at 
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:409)
>       at 
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:699)
>       at 
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:256)
>       at 
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:209)
>       at 
> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.<init>(KinesisReaderCheckpoint.java:44)
>       at 
> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.asCurrentStateOf(KinesisReaderCheckpoint.java:49)
>       at 
> org.apache.beam.sdk.io.kinesis.KinesisReader.getCheckpointMark(KinesisReader.java:137)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.snapshotState(UnboundedSourceWrapper.java:379)
>       at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>       at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:100)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>       ... 11 more
> {code}
> *What is the issue*
> org.apache.beam.sdk.io.kinesis.RoundRobin class is using ArrayDeque class 
> which is not thread safe. If ConcurrentLinkedDeque deque is used the problem 
> should be fixed.
> *Beam 2.3 (master branch)*
> Kinesis connector have been heavily refactored in master which makes me 
> thinks how we should go about this fix? The 
> org.apache.beam.sdk.io.kinesis.RoundRobin class doesn't exist in master 
> anymore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to