nsivabalan commented on code in PR #18829:
URL: https://github.com/apache/hudi/pull/18829#discussion_r3480024747
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java:
##########
@@ -34,11 +34,15 @@ public abstract class UpdateStrategy<T, I> implements
Serializable {
protected final transient HoodieEngineContext engineContext;
protected HoodieTable table;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+ protected Set<HoodieFileGroupId> fileGroupsToBeReplaced;
- public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+ public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,
Review Comment:
Addressed in df3e2563162d: restored `UpdateStrategy`'s 3-arg constructor as
a delegation to the 4-arg with `Collections.emptySet()`, and made the
reflective loader in `BaseSparkCommitActionExecutor` (and the new bulk-insert
path) try the 4-arg constructor first and fall back to the 3-arg on
`NoSuchMethodException` with a WARN noting that fallback skips INSERT_OVERWRITE
overlap detection.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java:
##########
@@ -35,13 +34,15 @@
public class SparkAllowUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
public SparkAllowUpdateStrategy(
- HoodieEngineContext engineContext, HoodieTable table,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
- super(engineContext, table, fileGroupsInPendingClustering);
+ HoodieEngineContext engineContext, HoodieTable table,
+ Set<HoodieFileGroupId> fileGroupsInPendingClustering,
+ Set<HoodieFileGroupId> fileGroupsToBeReplaced) {
+ super(engineContext, table, fileGroupsInPendingClustering,
fileGroupsToBeReplaced);
}
@Override
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
- List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
+ Set<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering =
fileGroupIdsWithRecordUpdate.stream()
.filter(fileGroupsInPendingClustering::contains)
Review Comment:
TODO already added in f086df5 (now part of df3e2563162d after rebase) on
both `SparkAllowUpdateStrategy.handleUpdate` and
`SparkConsistentBucketDuplicateUpdateStrategy.handleUpdate` referencing the
`fileGroupsToBeReplaced` follow-up.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String
partitionPath) {
return
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
+ @Override
+ protected Set<HoodieFileGroupId>
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+ String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ List<String> partitionPaths;
+
+ if (StringUtils.nonEmpty(staticOverwritePartition)) {
+ // Static insert overwrite: use the configured partitions
+ partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+ } else {
+ // Dynamic insert overwrite: determine partitions from input records
Review Comment:
Not a divergence risk in practice for INSERT_OVERWRITE: the same key
generator drives both `HoodieRecord::getPartitionPath` (input-side, used by
`getFileGroupsBeingReplaced`) and `status.getStat().getPartitionPath()`
(output-side, used by `getPartitionToReplacedFileIds`), and INSERT_OVERWRITE
has no record-level repartitioning between the two. The writeStatuses are
produced from these same input records' partition paths via the bulk-insert
partitioner, so any custom key generator change would affect both sides
identically. Resolving.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestInsertOverwriteWithClustering.java:
##########
@@ -0,0 +1,835 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
+import
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and DELETE_PARTITION
operations
+ * when there are pending clustering operations on the file groups being
replaced.
+ */
+public class TestInsertOverwriteWithClustering extends HoodieClientTestBase {
+
+ private HoodieTestDataGenerator dataGen;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts();
+ initTestDataGenerator();
+ initMetaClient(HoodieTableType.COPY_ON_WRITE);
+ dataGen = new HoodieTestDataGenerator();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ private HoodieWriteConfig.Builder getConfigBuilder(boolean
rollbackPendingClustering) {
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+ .withBulkInsertParallelism(2)
+ .withFinalizeWriteParallelism(2)
+ .withDeleteParallelism(2)
+ .withRollbackParallelism(2)
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
+
.withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
+ .withClusteringMaxNumGroups(10)
+ .withRollbackPendingClustering(rollbackPendingClustering)
+ .build());
+ }
+
+ private HoodieWriteConfig.Builder
getConfigBuilderWithPartitionFilter(boolean rollbackPendingClustering, String
partitionFilter) {
Review Comment:
Addressed in f086df5 (now part of df3e2563162d) — extracted
`baseClusteringConfigBuilder` shared between `getConfigBuilder` and
`getConfigBuilderWithPartitionFilter`, and
`testInsertOverwriteNonOverlappingPartitionWithPendingClustering` was switched
to use it.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java:
##########
@@ -39,18 +38,23 @@
@Slf4j
public class SparkRejectUpdateStrategy<T> extends BaseSparkUpdateStrategy<T> {
- public SparkRejectUpdateStrategy(HoodieEngineContext engineContext,
HoodieTable table, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
- super(engineContext, table, fileGroupsInPendingClustering);
+ public SparkRejectUpdateStrategy(HoodieEngineContext engineContext,
HoodieTable table,
+ Set<HoodieFileGroupId>
fileGroupsInPendingClustering,
+ Set<HoodieFileGroupId>
fileGroupsToBeReplaced) {
+ super(engineContext, table, fileGroupsInPendingClustering,
fileGroupsToBeReplaced);
}
@Override
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
- List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
- fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
- if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate))
{
+ Set<HoodieFileGroupId> allAffectedFileGroups =
getGroupIdsWithUpdate(taggedRecordsRDD);
+ // Combine file groups with updates and file groups to be replaced
Review Comment:
Addressed in f086df5 — the two restating comments are now a single
intent-level comment: `// also treat replaced file groups as potential conflict
targets`.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java:
##########
@@ -101,6 +103,26 @@ protected List<String> getAllExistingFileIds(String
partitionPath) {
return
table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
+ @Override
+ protected Set<HoodieFileGroupId>
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+ String staticOverwritePartition =
config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
+ List<String> partitionPaths;
+
+ if (StringUtils.nonEmpty(staticOverwritePartition)) {
+ // Static insert overwrite: use the configured partitions
+ partitionPaths = Arrays.asList(staticOverwritePartition.split(","));
+ } else {
+ // Dynamic insert overwrite: determine partitions from input records
+ partitionPaths =
inputRecords.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+ }
+
+ // Get all file groups in the partitions to be overwritten
+ return partitionPaths.stream()
+ .flatMap(partitionPath ->
table.getSliceView().getLatestFileSlices(partitionPath)
Review Comment:
Done in f086df5 — reusing `getAllExistingFileIds(partitionPath)` from
`SparkInsertOverwriteCommitActionExecutor.getFileGroupsBeingReplaced` instead
of re-iterating file slices inline.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -163,6 +167,18 @@ protected HoodieData<HoodieRecord<T>>
clusteringHandleUpdate(HoodieData<HoodieRe
return recordsAndPendingClusteringFileGroups.getLeft();
}
+ /**
+ * Get the file groups that will be replaced by the current operation.
+ * This is relevant for INSERT_OVERWRITE, INSERT_OVERWRITE_TABLE, and
DELETE_PARTITION operations.
+ *
+ * @param inputRecords the input records for the operation
+ * @return set of file group IDs that will be replaced
+ */
+ protected Set<HoodieFileGroupId>
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
Review Comment:
Right — addressed in df3e2563162d. Added
`rejectIfOverlappingPendingClustering(preparedRecords)` in
`BaseDatasetBulkInsertCommitActionExecutor.execute()`, called after
`HoodieDatasetBulkInsertHelper.prepareForBulkInsert` populates
`_hoodie_partition_path` so dynamic partition resolution can read it.
Bulk-insert subclasses implement `getFileGroupsBeingReplaced(Dataset<Row>)`:
`DatasetBulkInsertOverwriteCommitActionExecutor` handles static
(`STATIC_OVERWRITE_PARTITION_PATHS`) and dynamic (distinct
`_hoodie_partition_path`);
`DatasetBulkInsertOverwriteTableCommitActionExecutor` enumerates every
partition via `FSUtils.getAllPartitionPaths`. The same configured
`hoodie.clustering.updates.strategy` is loaded and invoked, so the default
`SparkRejectUpdateStrategy` rejects and `SparkAllowUpdateStrategy` defers to
conflict resolution, matching the existing Spark-side behavior.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java:
##########
@@ -53,4 +56,19 @@ protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieWriteMet
return
HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths,
partitionPaths.size()).mapToPair(
partitionPath -> Pair.of(partitionPath,
getAllExistingFileIds(partitionPath)))).collectAsMap();
}
+
+ @Override
+ protected Set<HoodieFileGroupId>
getFileGroupsBeingReplaced(HoodieData<HoodieRecord<T>> inputRecords) {
+ // INSERT_OVERWRITE_TABLE replaces every file group across every
partition, not just the
+ // partitions present in the input records. Enumerate all partitions to
match the semantics
+ // of getPartitionToReplacedFileIds above.
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient(), config.getMetadataConfig());
+ if (partitionPaths == null || partitionPaths.isEmpty()) {
+ return Collections.emptySet();
+ }
+ return partitionPaths.stream()
+ .flatMap(partitionPath ->
table.getSliceView().getLatestFileSlices(partitionPath)
+ .map(fileSlice -> new HoodieFileGroupId(partitionPath,
fileSlice.getFileId())))
Review Comment:
Addressed in df3e2563162d —
`SparkInsertOverwriteTableCommitActionExecutor.getFileGroupsBeingReplaced` now
uses `context.parallelize(...).flatMap(...)` instead of a sequential
driver-side `Stream`, matching the parallelization in the sibling
`getPartitionToReplacedFileIds`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]