[
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550597#comment-16550597
]
Kostas Kloudas commented on FLINK-4534:
---------------------------------------
Hi [[email protected]], [~gjy] and [~mingleizhang]!
Actually the lock is not needed at all. Internally, all these methods are
guarded by the `checkpointLock`, so the code does not need any synchronization.
In fact, this is how it is done in the coming `StreamingFileSink`.
I would recommend to remove all synchronization in the code and also clean up
some older JIRAs that also mention problems with synchronization.
The redundant synchronization should have been removed earlier to avoid
confusion, but unfortunately it was not.
> Lack of synchronization in BucketingSink#restoreState()
> -------------------------------------------------------
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Ted Yu
> Assignee: zhangminglei
> Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other
> methods, except for the following in restoreState():
> {code}
> for (BucketState<T> bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry<String, BucketState<T>> entry :
> state.bucketStates.entrySet()) {
> closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue
> starting line 752:
> {code}
> Set<Long> pastCheckpointIds =
> bucketState.pendingFilesPerCheckpoint.keySet();
> LOG.debug("Moving pending files to final location on restore.");
> for (Long pastCheckpointId : pastCheckpointIds) {
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)