BruceKellan commented on code in PR #5953:
URL: https://github.com/apache/hudi/pull/5953#discussion_r912915029


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java:
##########
@@ -302,6 +312,51 @@ private Stream<HoodieInstant> 
maySkipCompaction(Stream<HoodieInstant> instants)
         : instants;
   }
 
+  private Stream<FileSlice> filterFileSliceWithValidFiles(FileSystem fs, 
Stream<FileSlice> fileSlices) {
+    // we need to filter out the base file and log file that does not exist
+    return fileSlices.map(fileSlice -> {
+      List<HoodieLogFile> logFiles = fileSlice.getLogFiles()
+          .filter(logFile -> {
+            try {
+              return fs.exists(logFile.getPath());
+            } catch (IOException e) {
+              LOG.error("Checking exists of log file path: {} error", 
logFile.getPath().toString());
+              throw new HoodieException(e);
+            }
+          }).collect(Collectors.toList());
+      return generateFileSlice(fileSlice.getPartitionPath(),
+          fileSlice.getBaseInstantTime(),
+          fileSlice.getFileId(),
+          fileSlice.getBaseFile().orElse(null),
+          logFiles);
+    }).filter(fileSlice -> {
+      // we should keep the file slice if any base/log file exists
+      if (fileSlice.getLatestLogFile().isPresent()) {
+        return true;
+      }
+      Option<String> basePath = fileSlice.getBaseFile().map(BaseFile::getPath);
+      try {
+        return basePath.isPresent() && fs.exists(new 
org.apache.hadoop.fs.Path(basePath.get()));
+      } catch (IOException e) {
+        LOG.error("Checking exists of base path: {} error", basePath);
+        throw new HoodieException(e);
+      }
+    });
+  }
+
+  private FileSlice generateFileSlice(String partitionPath,
+                                      String baseInstant,

Review Comment:
   I have read this patch, you mean we don't do any existence checking, since 
we are using snapshot read, so from the scan step of split_monitor, the 
snapshot files we maintain should actually be complete.
   
   Even if we do the existence check, the file may still be incomplete due to 
the cleaner's mechanism, so we don't need this step now, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to