Hi Pawel,

I'll have a look!

Best,
Aljoscha

> On 16. Aug 2017, at 18:30, Lukasz Cwik <[email protected]> wrote:
> 
> Moved to [email protected]
> 
> On Wed, Aug 16, 2017 at 9:22 AM, Pawel Bartoszek <[email protected]
>> wrote:
> 
>> When flink performs a checkpoint I get randomly
>> ConcurrentModificationException.
>> 
>> From my investigation it looks like the method
>> 
>> public boolean advance() throws IOException
>> 
>> <http://goog_859136059>
>> from
>> 
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/
>> java/io/kinesis/src/main/java/org/apache/beam/sdk/io/
>> kinesis/KinesisReader.java
>> 
>> is called in another thread while checkpoint is being performed.
>> 
>> The exception is caused because the method
>> 
>> public UnboundedSource.CheckpointMark getCheckpointMark()
>> 
>> from the KinesisReader.java 
>> <https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java>
>>  is iterating over iterator returned by  
>> RoundRobin<ShardRecordsIterator>.iterator()  while a
>> 
>> public boolean advance() throws IOException is calling RoundRobin<
>> ShardRecordsIterator>.moveForward() from another thread which is causing
>> java.util.ConcurrentModificationException to be thrown.
>> 
>> 
>> RoundRobin<ShardRecordsIterator> class is using java.util.Deque queue
>> which doesn't allow adding/removal of element while it's being iterated.
>> 
>> Is some locking missing?
>> 
>> I am using Beam 2.0.0, Flink 1.2.1, 20 slots and 32 kinesis shards.
>> 
>> I created a bug for it as well
>> https://issues.apache.org/jira/browse/BEAM-2752
>> 
>> Stacktrace:
>> 
>> java.lang.Exception: Error while triggering checkpoint 59 for Source: 
>> Read(KinesisSource) -> Flat Map -> ParMultiDo(KinesisExtractor) -> Flat Map 
>> -> ParMultiDo(StringToRecord) -> Flat Map -> ParMultiDo(Anonymous) -> Flat 
>> Map -> ParMultiDo(ToRRecord) -> Flat Map -> ParMultiDo(AddTimestamps) -> 
>> Flat Map -> xxxx.yyyy.GroupByOneMinuteWindow GROUP RDOTRECORDS BY ONE MINUTE 
>> WINDOWS/Window.Assign.out -> (ParMultiDo(Anonymous) -> Flat Map -> 
>> ParMultiDo(ToSomeKey) -> Flat Map -> ToKeyedWorkItem, 
>> ParMultiDo(ToCompositeKey) -> Flat Map -> ParMultiDo(Anonymous) -> Flat Map 
>> -> ToKeyedWorkItem, ParMultiDo(Anonymous) -> Flat Map -> 
>> ParMultiDo(ApplyShardingKey) -> Flat Map -> ToKeyedWorkItem) (1/20)
>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1136)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not perform checkpoint 59 for operator 
>> Source: Read(KinesisSource) -> Flat Map -> ParMultiDo(KinesisExtractor) -> 
>> Flat Map -> ParMultiDo(StringToRecord) -> Flat Map -> ParMultiDo(Anonymous) 
>> -> Flat Map -> ParMultiDo(ToRRecord) -> Flat Map -> 
>> ParMultiDo(AddTimestamps) -> Flat Map -> xxxx.yyyy.GroupByOneMinuteWindow 
>> GROUP RDOTRECORDS BY ONE MINUTE WINDOWS/Window.Assign.out -> 
>> (ParMultiDo(Anonymous) -> Flat Map -> ParMultiDo(ToSomeKey) -> Flat Map -> 
>> ToKeyedWorkItem, ParMultiDo(ToCompositeKey) -> Flat Map -> 
>> ParMultiDo(Anonymous) -> Flat Map -> ToKeyedWorkItem, ParMultiDo(Anonymous) 
>> -> Flat Map -> ParMultiDo(ApplyShardingKey) -> Flat Map -> ToKeyedWorkItem) 
>> (1/20).
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:524)
>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1125)
>>      ... 5 more
>> Caused by: java.lang.Exception: Could not complete snapshot 59 for operator 
>> Source: Read(KinesisSource) -> Flat Map -> ParMultiDo(KinesisExtractor) -> 
>> Flat Map -> ParMultiDo(StringToRecord) -> Flat Map -> ParMultiDo(Anonymous) 
>> -> Flat Map -> ParMultiDo(ToRRecord) -> Flat Map -> 
>> ParMultiDo(AddTimestamps) -> Flat Map -> xxxx.yyyy.GroupByOneMinuteWindow 
>> GROUP RDOTRECORDS BY ONE MINUTE WINDOWS/Window.Assign.out -> 
>> (ParMultiDo(Anonymous) -> Flat Map -> ParMultiDo(ToSomeKey) -> Flat Map -> 
>> ToKeyedWorkItem, ParMultiDo(ToCompositeKey) -> Flat Map -> 
>> ParMultiDo(Anonymous) -> Flat Map -> ToKeyedWorkItem, ParMultiDo(Anonymous) 
>> -> Flat Map -> ParMultiDo(ApplyShardingKey) -> Flat Map -> ToKeyedWorkItem) 
>> (1/20).
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1090)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:630)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:575)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:518)
>>      ... 6 more
>> Caused by: java.util.ConcurrentModificationException
>>      at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:643)
>>      at 
>> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>      at 
>> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:409)
>>      at 
>> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList$Builder.addAll(ImmutableList.java:699)
>>      at 
>> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:256)
>>      at 
>> org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.collect.ImmutableList.copyOf(ImmutableList.java:209)
>>      at 
>> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.<init>(KinesisReaderCheckpoint.java:44)
>>      at 
>> org.apache.beam.sdk.io.kinesis.KinesisReaderCheckpoint.asCurrentStateOf(KinesisReaderCheckpoint.java:49)
>>      at 
>> org.apache.beam.sdk.io.kinesis.KinesisReader.getCheckpointMark(KinesisReader.java:137)
>>      at 
>> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.snapshotState(UnboundedSourceWrapper.java:379)
>>      at 
>> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>>      at 
>> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:100)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>>      ... 11 more
>> 
>> 
>> 

Reply via email to