the-other-tim-brown commented on code in PR #9667:
URL: https://github.com/apache/hudi/pull/9667#discussion_r1330380362
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1209,39 +1266,49 @@ public void testAsyncClusteringJobWithRetry(boolean
retryLastFailedClusteringJob
cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false",
"0", "false", "0"));
cfg.configs.addAll(getAllMultiWriterConfigs());
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- ds.sync();
+ HoodieDeltaStreamer ds2 = null;
+ try {
+ ds.sync();
- // assert ingest successful
- TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+ // assert ingest successful
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
- // schedule a clustering job to build a clustering plan
- HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath,
null, false, "schedule");
- schedule.cluster(0);
+ // schedule a clustering job to build a clustering plan
+ HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath,
null, false, "schedule");
+ schedule.cluster(0);
- // do another ingestion
- HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
- ds2.sync();
+ // do another ingestion
+ ds2 = new HoodieDeltaStreamer(cfg, jsc);
+ ds2.sync();
- // convert clustering request into inflight, Simulate the last clustering
failed scenario
- HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
- List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
- HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
- HoodieInstant hoodieInflightInstant =
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
+ // convert clustering request into inflight, Simulate the last
clustering failed scenario
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+ List<HoodieInstant> hoodieClusteringInstants =
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
+ HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
+ HoodieInstant hoodieInflightInstant =
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
Option.empty());
- // trigger a scheduleAndExecute clustering job
- // when retryFailedClustering true => will rollback and re-execute failed
clustering plan with same instant timestamp.
- // when retryFailedClustering false => will make and execute a new
clustering plan with new instant timestamp.
- HoodieClusteringJob scheduleAndExecute =
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute",
retryLastFailedClusteringJob, recordType);
- scheduleAndExecute.cluster(0);
+ // trigger a scheduleAndExecute clustering job
+ // when retryFailedClustering true => will rollback and re-execute
failed clustering plan with same instant timestamp.
+ // when retryFailedClustering false => will make and execute a new
clustering plan with new instant timestamp.
+ HoodieClusteringJob scheduleAndExecute =
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute",
retryLastFailedClusteringJob, recordType);
+ scheduleAndExecute.cluster(0);
- String completeClusteringTimeStamp =
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
+ String completeClusteringTimeStamp =
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
- if (retryLastFailedClusteringJob) {
- assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
- } else {
-
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
+ if (retryLastFailedClusteringJob) {
+ assertEquals(clusteringRequest.getTimestamp(),
completeClusteringTimeStamp);
+ } else {
+
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
+ }
+ UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+ } finally {
+ if (ds != null) {
Review Comment:
It is not, I've updated the PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]