Github user yanghua commented on a diff in the pull request:
https://github.com/apache/flink/pull/6375#discussion_r203996832
--- Diff:
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
@@ -722,9 +727,31 @@ public void notifyCheckpointComplete(long
checkpointId) throws Exception {
}
}
}
+ isBucketReady(partitionPaths);
}
}
+ @Override
+ public boolean isBucketReady(Set<Path> bucketPathes) {
+ for (Path path : bucketPathes) {
+ try {
+ RemoteIterator<LocatedFileStatus> files =
fs.listFiles(path, false);
+ while (files.hasNext()) {
+ LocatedFileStatus fileStatus =
files.next();
+ if
(fileStatus.getPath().getName().endsWith(pendingSuffix) ||
+
fileStatus.getPath().getName().endsWith(inProgressSuffix)) {
+ return false;
+ }
+ }
+ return true;
--- End diff --
I mean this return statement, can not verify all the bucket path is ready,
right? because the loop is not finished.
---