lw309637554 commented on a change in pull request #2379:
URL: https://github.com/apache/hudi/pull/2379#discussion_r551986565
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -682,6 +693,58 @@ public void testInlineClustering() throws Exception {
});
}
+ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String
basePath,
+ String
clusteringInstantTime, boolean runSchedule) {
+ HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
+ config.basePath = basePath;
+ config.clusteringInstantTime = clusteringInstantTime;
+ config.runSchedule = runSchedule;
+ config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+ return config;
+ }
+
+ @Test
+ public void testHoodieAsyncClusteringJob() throws Exception {
+ String tableBasePath = dfsBasePath + "/asyncClustering";
+ // Keep it higher than batch-size to test continuous mode
+ int totalRecords = 3000;
+
+ // Initial bulk insert
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.add(String.format("%s=%d",
SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
+ cfg.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP));
+ cfg.configs.add(String.format("%s=true",
HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ deltaStreamerTestRunner(ds, cfg, (r) -> {
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ // for not confiict with delta streamer commit, just add 3600s
+ String clusterInstantTime = HoodieActiveTimeline.COMMIT_FORMATTER
+ .format(new Date(System.currentTimeMillis() + 3600 * 1000));
+ LOG.info("Cluster instant time " + clusterInstantTime);
+ HoodieClusteringJob.Config scheduleClusteringConfig =
buildHoodieClusteringUtilConfig(tableBasePath,
+ clusterInstantTime, true);
+ HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc,
scheduleClusteringConfig);
+ int scheduleClusteringResult =
scheduleClusteringJob.cluster(scheduleClusteringConfig.retry);
+ if (scheduleClusteringResult == 0) {
+ LOG.info("Schedule clustering success, now cluster");
+ HoodieClusteringJob.Config clusterClusteringConfig =
buildHoodieClusteringUtilConfig(tableBasePath,
+ clusterInstantTime, false);
+ HoodieClusteringJob clusterClusteringJob = new
HoodieClusteringJob(jsc, clusterClusteringConfig);
+ clusterClusteringJob.cluster(clusterClusteringConfig.retry);
+ LOG.info("Cluster success");
+ } else {
+ LOG.warn("Schedule clustering failed");
+ }
+ HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
+ int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
+ System.out.println("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
+ return completeReplaceSize > 0;
Review comment:
Because if always completeReplaceSize <= 0 the runner will throw time
out exception.Now i add the assert for completeReplaceSize == 1. As the unit
test mainly test async clustering schedule and cluster, just assert
completeReplaceSize will be ok. For records check can cover in cluster unit
test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]