[ 
https://issues.apache.org/jira/browse/FLINK-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681390#comment-16681390
 ] 

vinoyang commented on FLINK-10840:
----------------------------------

[~till.rohrmann] and [~Zentol] I didn't actually encounter this bug, but from 
my observations, the following line of code in the comment will make the 
current checkpoint's pending files List empty, right? Because they are the same 
reference.

> BucketingSink incorrectly clears the pendingFiles List
> ------------------------------------------------------
>
>                 Key: FLINK-10840
>                 URL: https://issues.apache.org/jira/browse/FLINK-10840
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> BucketingSink#snapshotState : (see the *comment* in this method)
> {code:java}
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>    Preconditions.checkNotNull(restoredBucketStates, "The operator has not 
> been properly initialized.");
>    restoredBucketStates.clear();
>    synchronized (state.bucketStates) {
>       int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
>       for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
> state.bucketStates.entrySet()) {
>          BucketState<T> bucketState = bucketStateEntry.getValue();
>          if (bucketState.isWriterOpen) {
>             bucketState.currentFileValidLength = bucketState.writer.flush();
>          }
>          synchronized (bucketState.pendingFilesPerCheckpoint) {
>             
> bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
> bucketState.pendingFiles);
>          }
>          //This operation will make this collection prematurely emptied
>          bucketState.pendingFiles = new ArrayList<>();
>       }
>       restoredBucketStates.add(state);
>       if (LOG.isDebugEnabled()) {
>          LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), 
> subtaskIdx, state);
>       }
>    }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to