nsivabalan commented on code in PR #18829:
URL: https://github.com/apache/hudi/pull/18829#discussion_r3480024747


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java:
##########
@@ -34,11 +34,15 @@ public abstract class UpdateStrategy<T, I> implements 
Serializable {
   protected final transient HoodieEngineContext engineContext;
   protected HoodieTable table;
   protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+  protected Set<HoodieFileGroupId> fileGroupsToBeReplaced;
 
-  public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+  public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,

Review Comment:
   Addressed in df3e2563162d: restored `UpdateStrategy`'s 3-arg constructor as 
a delegation to the 4-arg with `Collections.emptySet()`, and made the 
reflective loader in `BaseSparkCommitActionExecutor` (and the new bulk-insert 
path) try the 4-arg constructor first and fall back to the 3-arg on 
`NoSuchMethodException` with a WARN noting that fallback skips INSERT_OVERWRITE 
overlap detection.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java:
##########
@@ -35,13 +34,15 @@
 public class SparkAllowUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
 
   public SparkAllowUpdateStrategy(
-      HoodieEngineContext engineContext, HoodieTable table, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
-    super(engineContext, table, fileGroupsInPendingClustering);
+      HoodieEngineContext engineContext, HoodieTable table,
+      Set<HoodieFileGroupId> fileGroupsInPendingClustering,
+      Set<HoodieFileGroupId> fileGroupsToBeReplaced) {
+    super(engineContext, table, fileGroupsInPendingClustering, 
fileGroupsToBeReplaced);
   }
 
   @Override
   public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
-    List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
+    Set<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
     Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = 
fileGroupIdsWithRecordUpdate.stream()
         .filter(fileGroupsInPendingClustering::contains)

Review Comment:
   TODO already added in f086df5 (now part of df3e2563162d after rebase) on 
both `SparkAllowUpdateStrategy.handleUpdate` and 
`SparkConsistentBucketDuplicateUpdateStrategy.handleUpdate` referencing the 
`fileGroupsToBeReplaced` follow-up.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String 
partitionPath) {
     return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    List<String> partitionPaths;
+
+    if (StringUtils.nonEmpty(staticOverwritePartition)) {
+      // Static insert overwrite: use the configured partitions
+      partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+    } else {
+      // Dynamic insert overwrite: determine partitions from input records

Review Comment:
   Not a divergence risk in practice for INSERT_OVERWRITE: the same key 
generator drives both `HoodieRecord::getPartitionPath` (input-side, used by 
`getFileGroupsBeingReplaced`) and `status.getStat().getPartitionPath()` 
(output-side, used by `getPartitionToReplacedFileIds`), and INSERT_OVERWRITE 
has no record-level repartitioning between the two. The writeStatuses are 
produced from these same input records' partition paths via the bulk-insert 
partitioner, so any custom key generator change would affect both sides 
identically. Resolving.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestInsertOverwriteWithClustering.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
+import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and DELETE_PARTITION 
operations
+ * when there are pending clustering operations on the file groups being 
replaced.
+ */
+public class TestInsertOverwriteWithClustering extends HoodieClientTestBase {
+
+  private HoodieTestDataGenerator dataGen;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initMetaClient(HoodieTableType.COPY_ON_WRITE);
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(boolean 
rollbackPendingClustering) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2)
+        .withDeleteParallelism(2)
+        .withRollbackParallelism(2)
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
+            
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
+            .withClusteringMaxNumGroups(10)
+            .withRollbackPendingClustering(rollbackPendingClustering)
+            .build());
+  }
+
+  private HoodieWriteConfig.Builder 
getConfigBuilderWithPartitionFilter(boolean rollbackPendingClustering, String 
partitionFilter) {

Review Comment:
   Addressed in f086df5 (now part of df3e2563162d) — extracted 
`baseClusteringConfigBuilder` shared between `getConfigBuilder` and 
`getConfigBuilderWithPartitionFilter`, and 
`testInsertOverwriteNonOverlappingPartitionWithPendingClustering` was switched 
to use it.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java:
##########
@@ -39,18 +38,23 @@
 @Slf4j
 public class SparkRejectUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
 
-  public SparkRejectUpdateStrategy(HoodieEngineContext engineContext, 
HoodieTable table, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
-    super(engineContext, table, fileGroupsInPendingClustering);
+  public SparkRejectUpdateStrategy(HoodieEngineContext engineContext, 
HoodieTable table,
+                                   Set<HoodieFileGroupId> 
fileGroupsInPendingClustering,
+                                   Set<HoodieFileGroupId> 
fileGroupsToBeReplaced) {
+    super(engineContext, table, fileGroupsInPendingClustering, 
fileGroupsToBeReplaced);
   }
 
   @Override
   public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
-    List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
-    fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
-      if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) 
{
+    Set<HoodieFileGroupId> allAffectedFileGroups = 
getGroupIdsWithUpdate(taggedRecordsRDD);
+    // Combine file groups with updates and file groups to be replaced

Review Comment:
   Addressed in f086df5 — the two restating comments are now a single 
intent-level comment: `// also treat replaced file groups as potential conflict 
targets`.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String 
partitionPath) {
     return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    List<String> partitionPaths;
+
+    if (StringUtils.nonEmpty(staticOverwritePartition)) {
+      // Static insert overwrite: use the configured partitions
+      partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+    } else {
+      // Dynamic insert overwrite: determine partitions from input records
+      partitionPaths = 
inputRecords.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+    }
+
+    // Get all file groups in the partitions to be overwritten
+    return partitionPaths.stream()
+        .flatMap(partitionPath -> 
table.getSliceView().getLatestFileSlices(partitionPath)

Review Comment:
   Done in f086df5 — reusing `getAllExistingFileIds(partitionPath)` from 
`SparkInsertOverwriteCommitActionExecutor.getFileGroupsBeingReplaced` instead 
of re-iterating file slices inline.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -163,6 +167,18 @@ protected HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieRe
     return recordsAndPendingClusteringFileGroups.getLeft();
   }
 
+  /**
+   * Get the file groups that will be replaced by the current operation.
+   * This is relevant for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and 
DELETE_PARTITION operations.
+   *
+   * @param inputRecords the input records for the operation
+   * @return set of file group IDs that will be replaced
+   */
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {

Review Comment:
   Right — addressed in df3e2563162d. Added 
`rejectIfOverlappingPendingClustering(preparedRecords)` in 
`BaseDatasetBulkInsertCommitActionExecutor.execute()`, called after 
`HoodieDatasetBulkInsertHelper.prepareForBulkInsert` populates 
`_hoodie_partition_path` so dynamic partition resolution can read it. 
Bulk-insert subclasses implement `getFileGroupsBeingReplaced(Dataset<Row>)`: 
`DatasetBulkInsertOverwriteCommitActionExecutor` handles static 
(`STATIC_OVERWRITE_PARTITION_PATHS`) and dynamic (distinct 
`_hoodie_partition_path`); 
`DatasetBulkInsertOverwriteTableCommitActionExecutor` enumerates every 
partition via `FSUtils.getAllPartitionPaths`. The same configured 
`hoodie.clustering.updates.strategy` is loaded and invoked, so the default 
`SparkRejectUpdateStrategy` rejects and `SparkAllowUpdateStrategy` defers to 
conflict resolution, matching the existing Spark-side behavior.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java:
##########
@@ -53,4 +56,19 @@ protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMet
     return 
HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, 
partitionPaths.size()).mapToPair(
         partitionPath -> Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
   }
+
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    // INSERT_OVERWRITE_TABLE replaces every file group across every 
partition, not just the
+    // partitions present in the input records. Enumerate all partitions to 
match the semantics
+    // of getPartitionToReplacedFileIds above.
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient(), config.getMetadataConfig());
+    if (partitionPaths == null || partitionPaths.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return partitionPaths.stream()
+        .flatMap(partitionPath -> 
table.getSliceView().getLatestFileSlices(partitionPath)
+            .map(fileSlice -> new HoodieFileGroupId(partitionPath, 
fileSlice.getFileId())))

Review Comment:
   Addressed in df3e2563162d — 
`SparkInsertOverwriteTableCommitActionExecutor.getFileGroupsBeingReplaced` now 
uses `context.parallelize(...).flatMap(...)` instead of a sequential 
driver-side `Stream`, matching the parallelization in the sibling 
`getPartitionToReplacedFileIds`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to