codope commented on code in PR #5557:
URL: https://github.com/apache/hudi/pull/5557#discussion_r870516851


##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java:
##########
@@ -157,6 +171,50 @@ public void execute(ExecutionContext context, int 
curItrCount) throws Exception
     }
   }
 
+  private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, 
String hudiTablePath, FileSystem fs, String inputPath) throws IOException, 
InterruptedException {
+    HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
+    HoodieTimeline commitTimeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
+    FileStatus[] subDirs = fs.listStatus(new Path(inputPath));

Review Comment:
   this is going to add to the validation time but i guess there is no other 
simpler way.



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java:
##########
@@ -157,6 +171,50 @@ public void execute(ExecutionContext context, int 
curItrCount) throws Exception
     }
   }
 
+  private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, 
String hudiTablePath, FileSystem fs, String inputPath) throws IOException, 
InterruptedException {
+    HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
+    HoodieTimeline commitTimeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
+    FileStatus[] subDirs = fs.listStatus(new Path(inputPath));
+    List<FileStatus> subDirList = Arrays.asList(subDirs);
+    subDirList.sort(Comparator.comparingLong(entry -> 
Long.parseLong(entry.getPath().getName())));
+    String latestSubDir = subDirList.get(subDirList.size() 
-1).getPath().getName();
+    log.info("Latest sub directory in input path " + latestSubDir + ", latest 
checkpoint from deltastreamer " +
+        (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none"));
+    long maxWaitTime = 5 * 60 * 1000;

Review Comment:
   Will this wait be good enough for long running jobs as well? Maybe consider 
making it configurable.



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java:
##########
@@ -157,6 +171,50 @@ public void execute(ExecutionContext context, int 
curItrCount) throws Exception
     }
   }
 
+  private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, 
String hudiTablePath, FileSystem fs, String inputPath) throws IOException, 
InterruptedException {
+    HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(hudiTablePath).build();
+    HoodieTimeline commitTimeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    Option<String> latestCheckpoint = getLatestCheckpoint(commitTimeline);
+    FileStatus[] subDirs = fs.listStatus(new Path(inputPath));

Review Comment:
   maybe we can use the files index if metadata table is enabled and fallback 
to listStatus if not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to