Github user yanghua commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6375#discussion_r203996832
  
    --- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
    @@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long 
checkpointId) throws Exception {
                                        }
                                }
                        }
    +                   isBucketReady(partitionPaths);
                }
        }
     
    +   @Override
    +   public boolean isBucketReady(Set<Path> bucketPathes) {
    +           for (Path path : bucketPathes) {
    +                   try {
    +                           RemoteIterator<LocatedFileStatus> files = 
fs.listFiles(path, false);
    +                           while (files.hasNext()) {
    +                                   LocatedFileStatus fileStatus = 
files.next();
    +                                   if 
(fileStatus.getPath().getName().endsWith(pendingSuffix) ||
    +                                           
fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
    +                                           return false;
    +                                   }
    +                           }
    +                           return true;
    --- End diff --
    
    I mean this return statement, can not verify all the bucket path is ready, 
right? because the loop is not finished.


---

Reply via email to