satishkotha commented on a change in pull request #3869:
URL: https://github.com/apache/hudi/pull/3869#discussion_r738026291
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -117,7 +119,24 @@ private void initKeyGenIfNeeded(boolean
populateMetaFields) {
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry ->
entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context,
fileGroupsInPendingClustering);
- return
(JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
+ Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>
recordsAndPendingClusteringFileGroups =
+ (Pair<JavaRDD<HoodieRecord<T>>,
Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
+ Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering =
recordsAndPendingClusteringFileGroups.getRight();
+ if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+ return recordsAndPendingClusteringFileGroups.getLeft();
+ }
+ // there are filegroups pending clustering and receving updates, so
rollback the inflight clustering instants
+ Set<HoodieInstant> pendingClusteringInstantsToRollback =
getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
+ .filter(e ->
fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ pendingClusteringInstantsToRollback.forEach(instant -> {
+ String commitTime = HoodieActiveTimeline.createNewInstantTime();
+ table.scheduleRollback(context, commitTime, instant, false);
+ table.rollback(context, commitTime, instant, false);
+
table.getActiveTimeline().revertReplaceCommitInflightToRequested(instant);
Review comment:
is this line needed? I think table.rollback would delete inflight right?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1402,6 +1410,43 @@ public void testPendingClusteringRollback(boolean
populateMetaFields) throws Exc
assertEquals(0,
ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean
populateMetaFields) throws Exception {
+ // setup clustering config with update strategy to allow updates during
ingestion
+ HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder()
+ .withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
+
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
+ .withInlineClustering(true).withInlineClusteringNumCommits(1).build();
+
+ // start clustering, but don't commit keep it inflight
+ List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig,
populateMetaFields, false);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+ List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
+
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+ assertEquals(1, pendingClusteringPlans.size());
+ HoodieInstant pendingClusteringInstant =
pendingClusteringPlans.get(0).getLeft();
+ assertEquals(pendingClusteringInstant.getState(), INFLIGHT);
+
+ // make an update to a filegroup within the partition that is pending
clustering
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
+ addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
+ cfgBuilder.withClusteringConfig(clusteringConfig);
+ HoodieWriteConfig config = cfgBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String commitTime = HoodieActiveTimeline.createNewInstantTime();
+ allRecords.addAll(dataGen.generateUpdates(commitTime, 200));
+ writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
+
+ // verify inflight clustering was rolled back
+ metaClient.reloadActiveTimeline();
+ pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+ assertEquals(1, pendingClusteringPlans.size());
+ pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
+ System.out.println(">>> pendingClusteringInstant: " +
pendingClusteringInstant);
Review comment:
nit: please remove println
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
##########
@@ -117,7 +119,24 @@ private void initKeyGenIfNeeded(boolean
populateMetaFields) {
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry ->
entry.getKey()).collect(Collectors.toSet());
UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context,
fileGroupsInPendingClustering);
- return
(JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
+ Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>
recordsAndPendingClusteringFileGroups =
+ (Pair<JavaRDD<HoodieRecord<T>>,
Set<HoodieFileGroupId>>)updateStrategy.handleUpdate(inputRecordsRDD);
+ Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering =
recordsAndPendingClusteringFileGroups.getRight();
+ if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+ return recordsAndPendingClusteringFileGroups.getLeft();
+ }
+ // there are filegroups pending clustering and receving updates, so
rollback the inflight clustering instants
+ Set<HoodieInstant> pendingClusteringInstantsToRollback =
getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
+ .filter(e ->
fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ pendingClusteringInstantsToRollback.forEach(instant -> {
+ String commitTime = HoodieActiveTimeline.createNewInstantTime();
+ table.scheduleRollback(context, commitTime, instant, false);
+ table.rollback(context, commitTime, instant, false);
Review comment:
I think this needs proper multi-writer integration. otherwise, there
are race conditions. for example, what happens if the clustering completes
after instants are fetched in line 129.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1402,6 +1410,43 @@ public void testPendingClusteringRollback(boolean
populateMetaFields) throws Exc
assertEquals(0,
ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInflightClusteringRollbackWhenUpdatesAllowed(boolean
populateMetaFields) throws Exception {
+ // setup clustering config with update strategy to allow updates during
ingestion
+ HoodieClusteringConfig clusteringConfig =
HoodieClusteringConfig.newBuilder()
+ .withClusteringMaxNumGroups(10).withClusteringTargetPartitions(0)
+
.withClusteringUpdatesStrategy("org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy")
+ .withInlineClustering(true).withInlineClusteringNumCommits(1).build();
+
+ // start clustering, but don't commit keep it inflight
+ List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig,
populateMetaFields, false);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
+ List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
+
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+ assertEquals(1, pendingClusteringPlans.size());
+ HoodieInstant pendingClusteringInstant =
pendingClusteringPlans.get(0).getLeft();
+ assertEquals(pendingClusteringInstant.getState(), INFLIGHT);
+
+ // make an update to a filegroup within the partition that is pending
clustering
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
+ addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
+ cfgBuilder.withClusteringConfig(clusteringConfig);
+ HoodieWriteConfig config = cfgBuilder.build();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String commitTime = HoodieActiveTimeline.createNewInstantTime();
+ allRecords.addAll(dataGen.generateUpdates(commitTime, 200));
+ writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
+
+ // verify inflight clustering was rolled back
+ metaClient.reloadActiveTimeline();
+ pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
+ assertEquals(1, pendingClusteringPlans.size());
+ pendingClusteringInstant = pendingClusteringPlans.get(0).getLeft();
+ System.out.println(">>> pendingClusteringInstant: " +
pendingClusteringInstant);
+ assertEquals(pendingClusteringInstant.getState(), REQUESTED);
Review comment:
IIRC, REQUESTED is still considered pending clustering operation, so
subsequent write operations would fail. Can you please ensure clustering is
fully rolled back. Both inflight and requested needs to be deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]