This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 01d905ff46b [HUDI-7961] Optimizing upsert partitioner for prepped 
write operations (#11581)
01d905ff46b is described below

commit 01d905ff46bceb735ac6d3fa2960ec34908dfcd2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jul 10 18:18:07 2024 -0700

    [HUDI-7961] Optimizing upsert partitioner for prepped write operations 
(#11581)
---
 .../table/action/commit/BaseSparkCommitActionExecutor.java   |  2 +-
 .../commit/SparkInsertOverwriteCommitActionExecutor.java     |  2 +-
 .../table/action/commit/SparkInsertOverwritePartitioner.java |  5 +++--
 .../apache/hudi/table/action/commit/UpsertPartitioner.java   | 10 ++++++++--
 .../deltacommit/BaseSparkDeltaCommitActionExecutor.java      |  2 +-
 .../deltacommit/SparkUpsertDeltaCommitPartitioner.java       |  5 +++--
 .../hudi/table/action/commit/TestUpsertPartitioner.java      | 12 ++++++------
 .../org/apache/hudi/common/model/WriteOperationType.java     |  4 ++++
 8 files changed, 27 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 32e4824b8b8..36902a8c3f2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -370,7 +370,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    return new UpsertPartitioner<>(profile, context, table, config);
+    return new UpsertPartitioner<>(profile, context, table, config, 
operationType);
   }
 
   public Partitioner getInsertPartitioner(WorkloadProfile profile) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index ac84475bfa4..63342989c79 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -71,7 +71,7 @@ public class SparkInsertOverwriteCommitActionExecutor<T>
   protected Partitioner getPartitioner(WorkloadProfile profile) {
     return table.getStorageLayout().layoutPartitionerClass()
         .map(c -> getLayoutPartitioner(profile, c))
-        .orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context, 
table, config));
+        .orElseGet(() -> new SparkInsertOverwritePartitioner(profile, context, 
table, config, operationType));
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
index cdf2bcd0345..d2cef9250e6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
 
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
@@ -38,8 +39,8 @@ public class SparkInsertOverwritePartitioner extends 
UpsertPartitioner {
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkInsertOverwritePartitioner.class);
 
   public SparkInsertOverwritePartitioner(WorkloadProfile profile, 
HoodieEngineContext context, HoodieTable table,
-                                         HoodieWriteConfig config) {
-    super(profile, context, table, config);
+                                         HoodieWriteConfig config, 
WriteOperationType operationType) {
+    super(profile, context, table, config, operationType);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index ea125614170..6536506adbd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -86,16 +87,21 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
   private HashMap<Integer, BucketInfo> bucketInfoMap;
 
   protected final HoodieWriteConfig config;
+  private final WriteOperationType operationType;
 
   public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext 
context, HoodieTable table,
-      HoodieWriteConfig config) {
+                           HoodieWriteConfig config, WriteOperationType 
operationType) {
     super(profile, table);
     updateLocationToBucket = new HashMap<>();
     partitionPathToInsertBucketInfos = new HashMap<>();
     bucketInfoMap = new HashMap<>();
     this.config = config;
+    this.operationType = operationType;
     assignUpdates(profile);
-    assignInserts(profile, context);
+    long totalInserts = 
profile.getInputPartitionPathStatMap().values().stream().mapToLong(stat -> 
stat.getNumInserts()).sum();
+    if (!WriteOperationType.isPreppedWriteOperation(operationType) || 
totalInserts > 0) { // skip if its prepped write operation. or if totalInserts 
= 0.
+      assignInserts(profile, context);
+    }
 
     LOG.info("Total Buckets: {}, bucketInfoMap size: {}, 
partitionPathToInsertBucketInfos size: {}, updateLocationToBucket size: {}",
         totalBuckets, bucketInfoMap.size(), 
partitionPathToInsertBucketInfos.size(), updateLocationToBucket.size());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
index be69be05c84..7ab6fee5b4f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
@@ -65,7 +65,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T>
     if (profile == null) {
       throw new HoodieUpsertException("Need workload profile to construct the 
upsert partitioner.");
     }
-    mergeOnReadUpsertPartitioner = new 
SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext) 
context, table, config);
+    mergeOnReadUpsertPartitioner = new 
SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext) 
context, table, config, operationType);
     return mergeOnReadUpsertPartitioner;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
index 035d23e9269..ac2cf320356 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -46,8 +47,8 @@ import java.util.stream.Collectors;
 public class SparkUpsertDeltaCommitPartitioner<T> extends UpsertPartitioner<T> 
{
 
   public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, 
HoodieSparkEngineContext context, HoodieTable table,
-                                           HoodieWriteConfig config) {
-    super(profile, context, table, config);
+                                           HoodieWriteConfig config, 
WriteOperationType operationType) {
+    super(profile, context, table, config, operationType);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 550818082a3..5c35685b7bd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -103,7 +103,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     records.addAll(insertRecords);
     records.addAll(updateRecords);
     WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(records)));
-    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config);
+    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config, WriteOperationType.UPSERT);
     assertEquals(0, partitioner.getPartition(
         new Tuple2<>(updateRecords.get(0).getKey(), 
Option.ofNullable(updateRecords.get(0).getCurrentLocation()))),
         "Update record should have gone to the 1 update partition");
@@ -228,7 +228,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", 
totalInsertNum);
 
     WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
-    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config);
+    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config, WriteOperationType.UPSERT);
     List<InsertBucketCumulativeWeightPair> insertBuckets = 
partitioner.getInsertBuckets(testPartitionPath);
 
     float bucket0Weight = 0.2f;
@@ -345,7 +345,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
 
     HoodieSparkTable table = HoodieSparkTable.create(config, context, 
metaClient);
-    SparkUpsertDeltaCommitPartitioner partitioner = new 
SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
+    SparkUpsertDeltaCommitPartitioner partitioner = new 
SparkUpsertDeltaCommitPartitioner(profile, context, table, config, 
WriteOperationType.UPSERT);
 
     assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
     assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -385,7 +385,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
 
     HoodieSparkTable table = HoodieSparkTable.create(config, context, 
metaClient);
     // create UpsertPartitioner
-    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config);
+    UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, 
table, config, WriteOperationType.UPSERT);
 
     // for now we have file slice1 and file slice3 and file slice1 is 
contained in pending clustering plan
     // So that only file slice3 can be used for ingestion.
@@ -423,7 +423,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     WorkloadProfile profile = new 
WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));
 
     HoodieSparkTable table = HoodieSparkTable.create(config, context, 
metaClient);
-    SparkUpsertDeltaCommitPartitioner partitioner = new 
SparkUpsertDeltaCommitPartitioner(profile, context, table, config);
+    SparkUpsertDeltaCommitPartitioner partitioner = new 
SparkUpsertDeltaCommitPartitioner(profile, context, table, config, 
WriteOperationType.UPSERT);
 
     assertEquals(1, partitioner.numPartitions(), "Should have 1 partitions");
     assertEquals(BucketType.UPDATE, partitioner.getBucketInfo(0).bucketType,
@@ -463,7 +463,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
 
     HoodieSparkTable<?> table = HoodieSparkTable.create(config, context, 
reloadedMetaClient);
 
-    SparkUpsertDeltaCommitPartitioner<?> partitioner = new 
SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config);
+    SparkUpsertDeltaCommitPartitioner<?> partitioner = new 
SparkUpsertDeltaCommitPartitioner<>(profile, context, table, config, 
WriteOperationType.UPSERT);
 
     assertEquals(3, partitioner.numPartitions());
     assertEquals(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
index 268703710c5..e69b4036a1a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java
@@ -164,4 +164,8 @@ public enum WriteOperationType {
   public static boolean isDelete(WriteOperationType operation) {
     return operation == DELETE || operation == DELETE_PREPPED;
   }
+
+  public static boolean isPreppedWriteOperation(WriteOperationType 
operationType) {
+    return operationType == BULK_INSERT_PREPPED || operationType == 
INSERT_PREPPED | operationType == UPSERT_PREPPED || operationType == 
DELETE_PREPPED;
+  }
 }

Reply via email to