[ https://issues.apache.org/jira/browse/BEAM-2752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on BEAM-2752 started by null. ---------------------------------- > KinesisIO throws ConcurrentModificationException on checkpoint > -------------------------------------------------------------- > > Key: BEAM-2752 > URL: https://issues.apache.org/jira/browse/BEAM-2752 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis > Affects Versions: 2.0.0, 2.1.0, 2.2.0 > Reporter: Pawel Bartoszek > Priority: P3 > > From time to time Kinesis IO throws ConcurrentModificationException on taking > a checkpoint. > {code:java} > Caused by: java.util.ConcurrentModificationException > at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:643) > at > org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) > at > org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:409) > at > org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:699) > at > org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:256) > at > org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:209) > at > org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.<init>(KinesisReaderCheckpoint.java:44) > at > org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.asCurrentStateOf(KinesisReaderCheckpoint.java:49) > at > org.apache.beam.sdk.io.kinesis.KinesisReader.getCheckpointMark(KinesisReader.java:137) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.snapshotState(UnboundedSourceWrapper.java:379) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:100) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 11 more > {code} > *What is the issue* > org.apache.beam.sdk.io.kinesis.RoundRobin class is using ArrayDeque class > which is not thread safe. If ConcurrentLinkedDeque deque is used the problem > should be fixed. > *Beam 2.3 (master branch)* > Kinesis connector have been heavily refactored in master which makes me > thinks how we should go about this fix? The > org.apache.beam.sdk.io.kinesis.RoundRobin class doesn't exist in master > anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)