[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668281#comment-15668281
 ] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88110293
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
    @@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState<T> 
bucketState) throws Exception {
     
        /**
         * Gets the truncate() call using reflection.
    -    *
         * <p>
    -    * Note: This code comes from Flume
    +    * <b>NOTE:</b> This code comes from Flume.
         */
        private Method reflectTruncate(FileSystem fs) {
    -           Method m = null;
    -           if(fs != null) {
    -                   Class<?> fsClass = fs.getClass();
    -                   try {
    -                           m = fsClass.getMethod("truncate", Path.class, 
long.class);
    -                   } catch (NoSuchMethodException ex) {
    -                           LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
    -                                   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
    -                           return null;
    -                   }
    +           if (this.refTruncate == null) {
    +                   Method m = null;
    +                   if (fs != null) {
    +                           Class<?> fsClass = fs.getClass();
    +                           try {
    +                                   m = fsClass.getMethod("truncate", 
Path.class, long.class);
    +                           } catch (NoSuchMethodException ex) {
    +                                   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
    +                                           " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
    +                                           validLengthSuffix, 
validLengthPrefix);
    +                                   return null;
    +                           }
     
    +                           // verify that truncate actually works
    +                           FSDataOutputStream outputStream;
    +                           Path testPath = new 
Path(UUID.randomUUID().toString());
    +                           try {
    +                                   outputStream = fs.create(testPath);
    +                                   outputStream.writeUTF("hello");
    +                                   outputStream.close();
    +                           } catch (IOException e) {
    +                                   LOG.error("Could not create file for 
checking if truncate works.", e);
    +                                   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
    +                           }
     
    -                   // verify that truncate actually works
    -                   FSDataOutputStream outputStream;
    -                   Path testPath = new Path(UUID.randomUUID().toString());
    -                   try {
    -                           outputStream = fs.create(testPath);
    -                           outputStream.writeUTF("hello");
    -                           outputStream.close();
    -                   } catch (IOException e) {
    -                           LOG.error("Could not create file for checking 
if truncate works.", e);
    -                           throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
    +                           try {
    +                                   m.invoke(fs, testPath, 2);
    +                           } catch (IllegalAccessException | 
InvocationTargetException e) {
    +                                   LOG.debug("Truncate is not supported.", 
e);
    +                                   m = null;
    +                           }
    +
    +                           try {
    +                                   fs.delete(testPath, false);
    +                           } catch (IOException e) {
    +                                   LOG.error("Could not delete truncate 
test file.", e);
    +                                   throw new RuntimeException("Could not 
delete truncate test file.", e);
    +                           }
                        }
    +                   this.refTruncate = m;
    +           }
    +           return this.refTruncate;
    +   }
     
    +   private Path getPendingPathFor(Path path) {
    +           return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
    +   }
     
    -                   try {
    -                           m.invoke(fs, testPath, 2);
    -                   } catch (IllegalAccessException | 
InvocationTargetException e) {
    -                           LOG.debug("Truncate is not supported.", e);
    -                           m = null;
    -                   }
    +   private Path getInProgressPathFor(Path path) {
    +           return new Path(path.getParent(), inProgressPrefix + 
path.getName()).suffix(inProgressSuffix);
    +   }
     
    -                   try {
    -                           fs.delete(testPath, false);
    -                   } catch (IOException e) {
    -                           LOG.error("Could not delete truncate test 
file.", e);
    -                           throw new RuntimeException("Could not delete 
truncate test file.", e);
    -                   }
    -           }
    -           return m;
    +   private Path getValidLengthPathFor(Path path) {
    +           return new Path(path.getParent(), validLengthPrefix + 
path.getName()).suffix(validLengthSuffix);
        }
     
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                synchronized (state.bucketStates) {
    -                   Iterator<Map.Entry<String, BucketState<T>>> it = 
state.bucketStates.entrySet().iterator();
    -                   while (it.hasNext()) {
    -                           BucketState<T> bucketState = 
it.next().getValue();
    +                   Iterator<Map.Entry<String, BucketState<T>>> stateIt = 
state.bucketStates.entrySet().iterator();
    +                   while (stateIt.hasNext()) {
    +                           BucketState<T> bucketState = 
stateIt.next().getValue();
                                synchronized 
(bucketState.pendingFilesPerCheckpoint) {
    -                                   Set<Long> pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
    -                                   Set<Long> checkpointsToRemove = new 
HashSet<>();
    -                                   for (Long pastCheckpointId : 
pastCheckpointIds) {
    +
    +                                   Iterator<Map.Entry<Long, List<String>>> 
pendingIt =
    +                                           
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
    +
    +                                   while (pendingIt.hasNext()) {
    +
    +                                           Map.Entry<Long, List<String>> 
entry = pendingIt.next();
    +                                           Long pastCheckpointId = 
entry.getKey();
    +                                           List<String> pendingPaths = 
entry.getValue();
    +
                                                if (pastCheckpointId <= 
checkpointId) {
                                                        LOG.debug("Moving 
pending files to final location for checkpoint {}", pastCheckpointId);
    -                                                   // All the pending 
files are buckets that have been completed but are waiting to be renamed
    -                                                   // to their final name
    -                                                   for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
    +
    +                                                   for (String filename : 
pendingPaths) {
                                                                Path finalPath 
= new Path(filename);
    -                                                           Path 
pendingPath = new Path(finalPath.getParent(),
    -                                                                   
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
    +                                                           Path 
pendingPath = getPendingPathFor(finalPath);
     
                                                                
fs.rename(pendingPath, finalPath);
                                                                LOG.debug(
                                                                        "Moving 
pending file {} to final location having completed checkpoint {}.",
                                                                        
pendingPath,
                                                                        
pastCheckpointId);
                                                        }
    -                                                   
checkpointsToRemove.add(pastCheckpointId);
    +                                                   pendingIt.remove();
                                                }
                                        }
    -                                   if (!bucketState.isWriterOpen && 
bucketState.pendingFiles.isEmpty()) {
    +
    +                                   if (!bucketState.isWriterOpen &&
    +                                           
bucketState.pendingFiles.isEmpty() &&
    +                                           
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
    +
                                                // We've dealt with all the 
pending files and the writer for this bucket is not currently open.
                                                // Therefore this bucket is 
currently inactive and we can remove it from our state.
    -                                           it.remove();
    -                                   } else {
    -                                           for (Long toRemove : 
checkpointsToRemove) {
    -                                                   
bucketState.pendingFilesPerCheckpoint.remove(toRemove);
    -                                           }
    +                                           stateIt.remove();
                                        }
                                }
                        }
                }
        }
     
        @Override
    -   public State<T> snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           Preconditions.checkNotNull(restoredBucketStates, "The operator 
has not been properly initialized.");
    +
    +           restoredBucketStates.clear();
    +
                synchronized (state.bucketStates) {
    -                   for (BucketState<T> bucketState : 
state.bucketStates.values()) {
    +                   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
    +
    +                   for (Map.Entry<String, BucketState<T>> bucketStateEntry 
: state.bucketStates.entrySet()) {
    +                           BucketState<T> bucketState = 
bucketStateEntry.getValue();
    +
                                if (bucketState.isWriterOpen) {
    -                                   long pos = bucketState.writer.flush();
    -                                   bucketState.currentFileValidLength = 
pos;
    +                                   bucketState.currentFileValidLength = 
bucketState.writer.flush();
                                }
    +
                                synchronized 
(bucketState.pendingFilesPerCheckpoint) {
    -                                   
bucketState.pendingFilesPerCheckpoint.put(checkpointId, 
bucketState.pendingFiles);
    +                                   
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
bucketState.pendingFiles);
                                }
                                bucketState.pendingFiles = new ArrayList<>();
                        }
    +                   restoredBucketStates.add(state);
    +
    +                   if (LOG.isDebugEnabled()) {
    +                           LOG.debug("BucketingSink idx {} checkpointed 
{}.", subtaskIdx, state);
    +                   }
                }
    -           return state;
        }
     
    -   @Override
    -   public void restoreState(State<T> state) {
    -           this.state = state;
    -
    -           try {
    -                   initFileSystem();
    -           } catch (IOException e) {
    -                   LOG.error("Error while creating FileSystem in 
checkpoint restore.", e);
    -                   throw new RuntimeException("Error while creating 
FileSystem in checkpoint restore.", e);
    -           }
    +   private void handleRestoredBucketState(BucketState<T> bucketState) {
    +           // we can clean all the pending files since they were renamed to
    --- End diff --
    
    I do not get the question? You mean when the thing on the comment can 
occur? The comment just says that we remove it from our current state. 
    
    Now if you mean when two tasks are responsible for the same files, then 
consider the  following: 
    
    Files written by a task contain the index of the task in their filename and 
the previous restore() was cleaning the files based on this index. When 
refactoring the BucketingSink we discussed with @aljoscha and we decided to not 
make the assumption that upon restoring/rescaling new task X will take the 
state of the previous task X, but states may be re-shuffled arbitrarily. This 
was also made in order to be future-proof.
    
    Under this new assumption, a new task 0 may take the state of the old task 
2. If the new task 2 deletes all the pending files that have its index just 
because they are no longer in its new state, then task 0 will not be able to 
recover them.


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to