[
https://issues.apache.org/jira/browse/FLINK-20544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354441#comment-17354441
]
luoyuxia edited comment on FLINK-20544 at 5/31/21, 12:48 PM:
-------------------------------------------------------------
I think this problem is caused by the reason that the method
`StreamingFileWriter.dispose` and
*StreamingFileWriter.notifyCheckpointComplete* is called concurrently.For
example, method *StreamingFileWriter.dispose* is called, then it will run the
following code:
{code:java}
activeBuckets.values().forEach(Bucket::disposePartFile);{code}
But at the same time,
*StreamingFileWriter.notifyCheckpointComplete* is also called, it may remove
the bucket for the *activeBuckets* as the following code shows:
{code:java}
while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket
is not currently open.
// Therefore this bucket is currently inactive and we can remove it from
our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
{code}
so, now the ConcurrentModificationException will be thrown.
We can use thread safe Map or add lock to solve this problem.
By the way, althogh it will throw this exception while disposing operators, the
exception will be
catched([[StreamTask::disposeAllOperators|https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703]
|[https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703])])so
that the lifecycle of operators won't be interupted. You can just ignore the
error message, your program will run normally.
was (Author: luoyuxia):
I think this problem is caused by the reason that the method
`StreamingFileWriter.dispose` and `
StreamingFileWriter.notifyCheckpointComplete` is called
concurrently.
For example, method `StreamingFileWriter.dispose` is called, then it will run
the following code:
{code:java}
activeBuckets.values().forEach(Bucket::disposePartFile);{code}
. But at the same time,
`StreamingFileWriter.notifyCheckpointComplete` is also called, it may remove
the bucket for the `activeBuckets` as the following code shows:
{code:java}
while (activeBucketIt.hasNext()) {
final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);
if (!bucket.isActive()) {
// We've dealt with all the pending files and the writer for this bucket
is not currently open.
// Therefore this bucket is currently inactive and we can remove it from
our state.
activeBucketIt.remove();
notifyBucketInactive(bucket);
}
}
{code}
so, now the ConcurrentModificationException will be thrown.
We can use thread safe Map or add lock to solve this problem.
By the way, althogh it will throw this exception while disposing operators, the
exception will be
catched([[StreamTask::disposeAllOperators|https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703]
|[https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703])|https://github.com/apache/flink/blob/release-1.11.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L703]),]so
that the lifecycle of operators won't be interupted. You can just ignore the
error message, your program will run normally.
> ConcurrentModificationException when writing to hive and fail over happened
> ---------------------------------------------------------------------------
>
> Key: FLINK-20544
> URL: https://issues.apache.org/jira/browse/FLINK-20544
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> Ecosystem
> Affects Versions: 1.11.1
> Reporter: zhuxiaoshang
> Priority: Minor
> Labels: auto-deprioritized-major
> Attachments: image-2020-12-09-14-57-22-862.png
>
>
> A job which read from Kafka and write to hive.When the fail over happened.Got
> the following exception.
> !image-2020-12-09-14-57-22-862.png!
> Maybe using thread safe Map is better choice.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)