danny0405 commented on a change in pull request #3067:
URL: https://github.com/apache/hudi/pull/3067#discussion_r651408416



##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java
##########
@@ -58,4 +76,61 @@ private static WriteProfile getWriteProfile(
   public static void clean(String path) {
     PROFILES.remove(path);
   }
+
+  /**
+   * Returns all the incremental write file path statuses with the given 
commits metadata.
+   *
+   * @param basePath     Table base path
+   * @param hadoopConf   The hadoop conf
+   * @param metadataList The commits metadata
+   * @return the file statuses array
+   */
+  public static FileStatus[] getWritePathsOfInstants(
+      Path basePath,
+      Configuration hadoopConf,
+      List<HoodieCommitMetadata> metadataList) {
+    FileSystem fs = FSUtils.getFs(basePath.toString(), hadoopConf);
+    return metadataList.stream().map(metadata -> 
getWritePathsOfInstant(basePath, metadata, fs))
+        .flatMap(Collection::stream).toArray(FileStatus[]::new);
+  }
+
+  private static List<FileStatus> getWritePathsOfInstant(Path basePath, 
HoodieCommitMetadata metadata, FileSystem fs) {
+    return 
metadata.getFileIdAndFullPaths(basePath.toString()).values().stream()
+        .map(org.apache.hadoop.fs.Path::new)
+        // filter out the file paths that does not exist, some files may be 
cleaned by
+        // the cleaner.
+        .filter(path -> {
+          try {
+            return fs.exists(path);
+          } catch (IOException e) {
+            LOG.error("Checking exists of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        }).map(path -> {
+          try {
+            return fs.getFileStatus(path);
+          } catch (IOException e) {
+            LOG.error("Get write status of path: {} error", path);
+            throw new HoodieException(e);
+          }
+        })
+        // filter out crushed files

Review comment:
       The write should not affect the read. The code was added long time ago, 
a committed file (merge handle) was later modified by the following 
modification instants. The first version write handle was not closed until 
checkpoint success event received(it was modified now), a merge handle may be 
empty if it does not invoke close.
   
   We can till keep the filtering to make the read robust.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to