lw309637554 commented on a change in pull request #2379:
URL: https://github.com/apache/hudi/pull/2379#discussion_r551986565



##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -682,6 +693,58 @@ public void testInlineClustering() throws Exception {
     });
   }
 
+  private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String 
basePath,
+                                                                  String 
clusteringInstantTime, boolean runSchedule) {
+    HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
+    config.basePath = basePath;
+    config.clusteringInstantTime = clusteringInstantTime;
+    config.runSchedule = runSchedule;
+    config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+    return config;
+  }
+
+  @Test
+  public void testHoodieAsyncClusteringJob() throws Exception {
+    String tableBasePath = dfsBasePath + "/asyncClustering";
+    // Keep it higher than batch-size to test continuous mode
+    int totalRecords = 3000;
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.add(String.format("%s=%d", 
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
+    cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
+    cfg.configs.add(String.format("%s=true", 
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    deltaStreamerTestRunner(ds, cfg, (r) -> {
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      // for not confiict with delta streamer commit, just add 3600s
+      String clusterInstantTime = HoodieActiveTimeline.COMMIT_FORMATTER
+          .format(new Date(System.currentTimeMillis() + 3600 * 1000));
+      LOG.info("Cluster instant time " + clusterInstantTime);
+      HoodieClusteringJob.Config scheduleClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
+          clusterInstantTime, true);
+      HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, 
scheduleClusteringConfig);
+      int scheduleClusteringResult = 
scheduleClusteringJob.cluster(scheduleClusteringConfig.retry);
+      if (scheduleClusteringResult == 0) {
+        LOG.info("Schedule clustering success, now cluster");
+        HoodieClusteringJob.Config clusterClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
+            clusterInstantTime, false);
+        HoodieClusteringJob clusterClusteringJob = new 
HoodieClusteringJob(jsc, clusterClusteringConfig);
+        clusterClusteringJob.cluster(clusterClusteringConfig.retry);
+        LOG.info("Cluster success");
+      } else {
+        LOG.warn("Schedule clustering failed");
+      }
+      HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+      int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
+      int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
+      System.out.println("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
+      return completeReplaceSize > 0;

Review comment:
       Because if always completeReplaceSize <= 0 the runner will throw time 
out exception.Now i add the assert for completeReplaceSize == 1. As the unit 
test mainly test async clustering schedule and cluster, just assert 
completeReplaceSize will be ok. For records check can cover in cluster unit 
test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to