[
https://issues.apache.org/jira/browse/BEAM-2752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anonymous updated BEAM-2752:
----------------------------
Status: Triage Needed (was: In Progress)
> KinesisIO throws ConcurrentModificationException on checkpoint
> --------------------------------------------------------------
>
> Key: BEAM-2752
> URL: https://issues.apache.org/jira/browse/BEAM-2752
> Project: Beam
> Issue Type: Bug
> Components: io-java-kinesis
> Affects Versions: 2.0.0, 2.1.0, 2.2.0
> Reporter: Pawel Bartoszek
> Priority: P3
>
> From time to time Kinesis IO throws ConcurrentModificationException on taking
> a checkpoint.
> {code:java}
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:643)
> at
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
> at
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:409)
> at
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:699)
> at
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:256)
> at
> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:209)
> at
> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.<init>(KinesisReaderCheckpoint.java:44)
> at
> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.asCurrentStateOf(KinesisReaderCheckpoint.java:49)
> at
> org.apache.beam.sdk.io.kinesis.KinesisReader.getCheckpointMark(KinesisReader.java:137)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.snapshotState(UnboundedSourceWrapper.java:379)
> at
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:100)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 11 more
> {code}
> *What is the issue*
> org.apache.beam.sdk.io.kinesis.RoundRobin class is using ArrayDeque class
> which is not thread safe. If ConcurrentLinkedDeque deque is used the problem
> should be fixed.
> *Beam 2.3 (master branch)*
> Kinesis connector have been heavily refactored in master which makes me
> thinks how we should go about this fix? The
> org.apache.beam.sdk.io.kinesis.RoundRobin class doesn't exist in master
> anymore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)