[ 
https://issues.apache.org/jira/browse/FLINK-20544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354441#comment-17354441
 ] 

luoyuxia commented on FLINK-20544:
----------------------------------

I think this problem is caused by the reason that the method 
`StreamingFileWriter.dispose` and  `

StreamingFileWriter.notifyCheckpointComplete` is called

concurrently.

For example, method  `StreamingFileWriter.dispose` is called, then it will run 
the following code:

 
{code:java}
activeBuckets.values().forEach(Bucket::disposePartFile);{code}
 

. But at the same time, 

`StreamingFileWriter.notifyCheckpointComplete` is also called, it may remove 
the bucket for the `activeBuckets` as the following code shows:
{code:java}
while (activeBucketIt.hasNext()) {   
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
   bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

   if (!bucket.isActive()) {
      // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
      // Therefore this bucket is currently inactive and we can remove it from 
our state.
      activeBucketIt.remove();
      notifyBucketInactive(bucket);
   }
}


{code}
so, now the ConcurrentModificationException will be thrown.

We can use thread safe Map or add lock to solve this problem.

By the way, althogh it will throw this exception while disposing operators, the 
exception will be 
catched([[StreamTask::disposeAllOperators|https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703]
 
|[https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703])|https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703]),]so
 that the lifecycle of operators won't be interupted. You can just ignore the 
error message, your program will run normally.

 

 

 

 

 

 

> ConcurrentModificationException when writing to hive and fail over happened
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-20544
>                 URL: https://issues.apache.org/jira/browse/FLINK-20544
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / 
> Ecosystem
>    Affects Versions: 1.11.1
>            Reporter: zhuxiaoshang
>            Priority: Minor
>              Labels: auto-deprioritized-major
>         Attachments: image-2020-12-09-14-57-22-862.png
>
>
> A job which read from Kafka and write to hive.When the fail over happened.Got 
> the following exception.
> !image-2020-12-09-14-57-22-862.png!
> Maybe using thread safe Map is better choice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to