codope commented on a change in pull request #3869:
URL: https://github.com/apache/hudi/pull/3869#discussion_r739101870



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1402,6 +1410,43 @@ public void testPendingClusteringRollback(boolean 
populateMetaFields) throws Exc
     assertEquals(0, 
ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean 
populateMetaFields) throws Exception {
+    // setup clustering config with update strategy to allow updates during 
ingestion
+    HoodieClusteringConfig clusteringConfig = 
HoodieClusteringConfig.newBuilder()
+        .withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
+        
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
+        .withInlineClustering(true).withInlineClusteringNumCommits(1).build();
+
+    // start clustering, but don't commit keep it inflight
+    List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, 
populateMetaFields, false);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+    List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
+        
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+    assertEquals(1, pendingClusteringPlans.size());
+    HoodieInstant pendingClusteringInstant = 
pendingClusteringPlans.get(0).getLeft();
+    assertEquals(pendingClusteringInstant.getState(), INFLIGHT);
+
+    // make an update to a filegroup within the partition that is pending 
clustering
+    HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
+    addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
+    cfgBuilder.withClusteringConfig(clusteringConfig);
+    HoodieWriteConfig config = cfgBuilder.build();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    String commitTime = HoodieActiveTimeline.createNewInstantTime();
+    allRecords.addAll(dataGen.generateUpdates(commitTime, 200));
+    writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
+
+    // verify inflight clustering was rolled back
+    metaClient.reloadActiveTimeline();
+    pendingClusteringPlans = 
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+    assertEquals(1, pendingClusteringPlans.size());
+    pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
+    System.out.println(">>> pendingClusteringInstant: " + 
pendingClusteringInstant);
+    assertEquals(pendingClusteringInstant.getState(), REQUESTED);

Review comment:
       Thanks for clarifying. I only considered inflight. I'll make this change 
in the next commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to