[
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16115329#comment-16115329
]
ASF GitHub Bot commented on FLINK-4534:
---------------------------------------
GitHub user zhangminglei opened a pull request:
https://github.com/apache/flink/pull/4482
[FLINK-4534] Fix synchronization issue in BucketingSink
## What is the purpose of the change
Fix lacking of synchronization in BucketingSink#close method.
## Verifying this change
This change is a trivial work without any test coverage.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zhangminglei/flink flink-4534
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4482.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4482
----
commit 7fa19abf93dcc6891d45dc73bb5a6caf16cf34fd
Author: zhangminglei <[email protected]>
Date: 2017-08-05T04:04:43Z
[FLINK-4534] Fix synchronization issue in BucketingSink
----
> Lack of synchronization in BucketingSink#restoreState()
> -------------------------------------------------------
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Reporter: Ted Yu
> Assignee: mingleizhang
>
> Iteration over state.bucketStates is protected by synchronization in other
> methods, except for the following in restoreState():
> {code}
> for (BucketState<T> bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry<String, BucketState<T>> entry :
> state.bucketStates.entrySet()) {
> closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue
> starting line 752:
> {code}
> Set<Long> pastCheckpointIds =
> bucketState.pendingFilesPerCheckpoint.keySet();
> LOG.debug("Moving pending files to final location on restore.");
> for (Long pastCheckpointId : pastCheckpointIds) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)