hudi-agent commented on code in PR #18829:
URL: https://github.com/apache/hudi/pull/18829#discussion_r3431492173


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -163,6 +167,18 @@ protected HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieRe
     return recordsAndPendingClusteringFileGroups.getLeft();
   }
 
+  /**
+   * Get the file groups that will be replaced by the current operation.
+   * This is relevant for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and 
DELETE_PARTITION operations.
+   *
+   * @param inputRecords the input records for the operation
+   * @return set of file group IDs that will be replaced
+   */
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {

Review Comment:
   🤖 The new check only fires for code paths that flow through 
`BaseSparkCommitActionExecutor.clusteringHandleUpdate`. The Spark SQL 
bulk-insert overwrite path (`DatasetBulkInsertOverwriteCommitActionExecutor` / 
`DatasetBulkInsertOverwriteTableCommitActionExecutor` in 
`hudi-spark-datasource/hudi-spark-common`) has its own `execute()` that goes 
through `HoodieDatasetBulkInsertHelper.bulkInsert` and never touches 
`clusteringHandleUpdate`, so `INSERT OVERWRITE [TABLE]` SQL with bulk-insert 
enabled (which is the default in many configs via 
`BULKINSERT_OVERWRITE_OPERATION_TYPE`) would still silently replace file groups 
in pending clustering. @yihua / @nsivabalan — is this intentionally out of 
scope (and worth calling out alongside `SparkAllowUpdateStrategy` in the PR's 
"NOT covered" list), or should an equivalent guard be added to 
`BaseDatasetBulkInsertCommitActionExecutor.execute`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestInsertOverwriteWithClustering.java:
##########
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
+import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and DELETE_PARTITION 
operations
+ * when there are pending clustering operations on the file groups being 
replaced.
+ */
+public class TestInsertOverwriteWithClustering extends HoodieClientTestBase {
+
+  private HoodieTestDataGenerator dataGen;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initPath();
+    initSparkContexts();
+    initTestDataGenerator();
+    initMetaClient(HoodieTableType.COPY_ON_WRITE);
+    dataGen = new HoodieTestDataGenerator();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  private HoodieClusteringConfig.Builder baseClusteringConfigBuilder(boolean 
rollbackPendingClustering) {
+    return HoodieClusteringConfig.newBuilder()
+        
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
+        
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
+        .withClusteringMaxNumGroups(10)
+        .withRollbackPendingClustering(rollbackPendingClustering);
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder(boolean 
rollbackPendingClustering) {
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2)
+        .withBulkInsertParallelism(2)
+        .withFinalizeWriteParallelism(2)
+        .withDeleteParallelism(2)
+        .withRollbackParallelism(2)
+        
.withClusteringConfig(baseClusteringConfigBuilder(rollbackPendingClustering).build());
+  }
+
+  private HoodieWriteConfig.Builder 
getConfigBuilderWithPartitionFilter(boolean rollbackPendingClustering, String 
partitionFilter) {

Review Comment:
   🤖 nit: `getConfigBuilderWithPartitionFilter` copies all eight parallelism 
settings from `getConfigBuilder` verbatim — have you considered just 
delegating: `return 
getConfigBuilder(rollbackPendingClustering).withClusteringConfig(baseClusteringConfigBuilder(rollbackPendingClustering).withClusteringPartitionSelected(partitionFilter).build())`?
 That way there's a single place to update if the default parallelism ever 
changes.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java:
##########
@@ -53,4 +56,19 @@ protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieWriteMet
     return 
HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, 
partitionPaths.size()).mapToPair(
         partitionPath -> Pair.of(partitionPath, 
getAllExistingFileIds(partitionPath)))).collectAsMap();
   }
+
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    // INSERT_OVERWRITE_TABLE replaces every file group across every 
partition, not just the
+    // partitions present in the input records. Enumerate all partitions to 
match the semantics
+    // of getPartitionToReplacedFileIds above.
+    List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, 
table.getMetaClient(), config.getMetadataConfig());
+    if (partitionPaths == null || partitionPaths.isEmpty()) {
+      return Collections.emptySet();
+    }
+    return partitionPaths.stream()
+        .flatMap(partitionPath -> 
table.getSliceView().getLatestFileSlices(partitionPath)
+            .map(fileSlice -> new HoodieFileGroupId(partitionPath, 
fileSlice.getFileId())))

Review Comment:
   🤖 Two related concerns here: (1) this uses a sequential driver-side `Stream` 
to walk every partition, while the sibling `getPartitionToReplacedFileIds` 
parallelizes via `context.parallelize(partitionPaths, partitionPaths.size())` — 
for tables with many partitions where the file system view isn't fully cached 
(e.g., MDT-backed view that lazy-loads), the sequential walk could be a 
meaningful slowdown; (2) unlike 
`SparkInsertOverwriteCommitActionExecutor.getFileGroupsBeingReplaced`, this 
skips the `getAllExistingFileIds(partitionPath)` helper and inlines the 
slice-view call directly. Could you align with the parallelized helper-based 
pattern in `getPartitionToReplacedFileIds` for both consistency and scalability?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String 
partitionPath) {
     return 
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
   }
 
+  @Override
+  protected Set<HoodieFileGroupId> 
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+    String staticOverwritePartition = 
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+    List<String> partitionPaths;

Review Comment:
   🤖 nit: `staticOverwritePartition` is a singular name but the value can be a 
comma-separated list of multiple paths — could you rename it to 
`staticOverwritePartitionPaths` (or just `staticPartitionPaths`) to match what 
the variable actually holds?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to