This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 01d905ff46b [HUDI-7961] Optimizing upsert partitioner for prepped
write operations (#11581)
01d905ff46b is described below
commit 01d905ff46bceb735ac6d3fa2960ec34908dfcd2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jul 10 18:18:07 2024 -0700
[HUDI-7961] Optimizing upsert partitioner for prepped write operations
(#11581)
---
.../table/action/commit/BaseSparkCommitActionExecutor.java | 2 +-
.../commit/SparkInsertOverwriteCommitActionExecutor.java | 2 +-
.../table/action/commit/SparkInsertOverwritePartitioner.java | 5 +++--
.../apache/hudi/table/action/commit/UpsertPartitioner.java | 10 ++++++++--
.../deltacommit/BaseSparkDeltaCommitActionExecutor.java | 2 +-
.../deltacommit/SparkUpsertDeltaCommitPartitioner.java | 5 +++--
.../hudi/table/action/commit/TestUpsertPartitioner.java | 12 ++++++------
.../org/apache/hudi/common/model/WriteOperationType.java | 4 ++++
8 files changed, 27 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 32e4824b8b8..36902a8c3f2 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -370,7 +370,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the
upsert partitioner.");
}
- return new UpsertPartitioner<>(profile, context, table, config);
+ return new UpsertPartitioner<>(profile, context, table, config,
operationType);
}
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index ac84475bfa4..63342989c79 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -71,7 +71,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
protected Partitioner getPartitioner(WorkloadProfile profile) {
return table.getStorageLayout().layoutPartitionerClass()
.map(c -> getLayoutPartitioner(profile, c))
- .orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context,
table, config));
+ .orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context,
table, config, operationType));
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
index cdf2bcd0345..d2cef9250e6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
@@ -38,8 +39,8 @@ public class SparkInsertOverwritePartitioner extends
UpsertPartitioner {
private static final Logger LOG =
LoggerFactory.getLogger(SparkInsertOverwritePartitioner.class);
public SparkInsertOverwritePartitioner(WorkloadProfile profile,
HoodieEngineContext context, HoodieTable table,
- HoodieWriteConfig config) {
- super(profile, context, table, config);
+ HoodieWriteConfig config,
WriteOperationType operationType) {
+ super(profile, context, table, config, operationType);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index ea125614170..6536506adbd 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
@@ -86,16 +87,21 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
private HashMap<Integer, BucketInfo> bucketInfoMap;
protected final HoodieWriteConfig config;
+ private final WriteOperationType operationType;
public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext
context, HoodieTable table,
- HoodieWriteConfig config) {
+ HoodieWriteConfig config, WriteOperationType
operationType) {
super(profile, table);
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBucketInfos = new HashMap<>();
bucketInfoMap = new HashMap<>();
this.config = config;
+ this.operationType = operationType;
assignUpdates(profile);
- assignInserts(profile, context);
+ long totalInserts =
profile.getInputPartitionPathStatMap().values().stream().mapToLong(stat ->
stat.getNumInserts()).sum();
+ if (!WriteOperationType.isPreppedWriteOperation(operationType) ||
totalInserts > 0) { // skip if its prepped write operation. or if totalInserts
= 0.
+ assignInserts(profile, context);
+ }
LOG.info("Total Buckets: {}, bucketInfoMap size: {},
partitionPathToInsertBucketInfos size: {}, updateLocationToBucket size: {}",
totalBuckets, bucketInfoMap.size(),
partitionPathToInsertBucketInfos.size(), updateLocationToBucket.size());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
index be69be05c84..7ab6fee5b4f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
@@ -65,7 +65,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T>
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the
upsert partitioner.");
}
- mergeOnReadUpsertPartitioner = new
SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext)
context, table, config);
+ mergeOnReadUpsertPartitioner = new
SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext)
context, table, config, operationType);
return mergeOnReadUpsertPartitioner;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
index 035d23e9269..ac2cf320356 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -46,8 +47,8 @@ import java.util.stream.Collectors;
public class SparkUpsertDeltaCommitPartitioner<T> extends UpsertPartitioner<T>
{
public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile,
HoodieSparkEngineContext context, HoodieTable table,
- HoodieWriteConfig config) {
- super(profile, context, table, config);
+ HoodieWriteConfig config,
WriteOperationType operationType) {
+ super(profile, context, table, config, operationType);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 550818082a3..5c35685b7bd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -103,7 +103,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
records.addAll(insertRecords);
records.addAll(updateRecords);
WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(records)));
- UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config);
+ UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config, WriteOperationType.UPSERT);
assertEquals(0, partitioner.getPartition(
new Tuple2<>(updateRecords.get(0).getKey(),
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
"Update record should have gone to the 1 update partition");
@@ -228,7 +228,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001",
totalInsertNum);
WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
- UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config);
+ UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config, WriteOperationType.UPSERT);
List<InsertBucketCumulativeWeightPair> insertBuckets =
partitioner.getInsertBuckets(testPartitionPath);
float bucket0Weight = 0.2f;
@@ -345,7 +345,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
HoodieSparkTable table = HoodieSparkTable.create(config, context,
metaClient);
- SparkUpsertDeltaCommitPartitioner partitioner = new
SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
+ SparkUpsertDeltaCommitPartitioner partitioner = new
SparkUpsertDeltaCommitPartitioner(profile, context, table, config,
WriteOperationType.UPSERT);
assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -385,7 +385,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
HoodieSparkTable table = HoodieSparkTable.create(config, context,
metaClient);
// create UpsertPartitioner
- UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config);
+ UpsertPartitioner partitioner = new UpsertPartitioner(profile, context,
table, config, WriteOperationType.UPSERT);
// for now we have file slice1 and file slice3 and file slice1 is
contained in pending clustering plan
// So that only file slice3 can be used for ingestion.
@@ -423,7 +423,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
WorkloadProfile profile = new
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
HoodieSparkTable table = HoodieSparkTable.create(config, context,
metaClient);
- SparkUpsertDeltaCommitPartitioner partitioner = new
SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
+ SparkUpsertDeltaCommitPartitioner partitioner = new
SparkUpsertDeltaCommitPartitioner(profile, context, table, config,
WriteOperationType.UPSERT);
assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -463,7 +463,7 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
HoodieSparkTable<?> table = HoodieSparkTable.create(config, context,
reloadedMetaClient);
- SparkUpsertDeltaCommitPartitioner<?> partitioner = new
SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config);
+ SparkUpsertDeltaCommitPartitioner<?> partitioner = new
SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config,
WriteOperationType.UPSERT);
assertEquals(3, partitioner.numPartitions());
assertEquals(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 268703710c5..e69b4036a1a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -164,4 +164,8 @@ public enum WriteOperationType {
public static boolean isDelete(WriteOperationType operation) {
return operation == DELETE || operation == DELETE_PREPPED;
}
+
+ public static boolean isPreppedWriteOperation(WriteOperationType
operationType) {
+ return operationType == BULK_INSERT_PREPPED || operationType ==
INSERT_PREPPED | operationType == UPSERT_PREPPED || operationType ==
DELETE_PREPPED;
+ }
}