[
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431583#comment-16431583
]
Advertising
ASF GitHub Bot commented on FLINK-4534:
---------------------------------------
Github user zhangminglei commented on the issue:
https://github.com/apache/flink/pull/4482
This PR has conflicts, I will fix soon.
> Lack of synchronization in BucketingSink#restoreState()
> -------------------------------------------------------
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Ted Yu
> Assignee: mingleizhang
> Priority: Major
>
> Iteration over state.bucketStates is protected by synchronization in other
> methods, except for the following in restoreState():
> {code}
> for (BucketState<T> bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry<String, BucketState<T>> entry :
> state.bucketStates.entrySet()) {
> closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue
> starting line 752:
> {code}
> Set<Long> pastCheckpointIds =
> bucketState.pendingFilesPerCheckpoint.keySet();
> LOG.debug("Moving pending files to final location on restore.");
> for (Long pastCheckpointId : pastCheckpointIds) {
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)