This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0938f55  [HUDI-3458] Fix BulkInsertPartitioner generic type (#4854)
0938f55 is described below

commit 0938f55a2b033745b5e2dafad8ce31e60bb394ac
Author: Raymond Xu <[email protected]>
AuthorDate: Sun Feb 20 10:51:58 2022 -0800

    [HUDI-3458] Fix BulkInsertPartitioner generic type (#4854)
---
 .../apache/hudi/table/action/commit/BaseBulkInsertHelper.java |  4 ++--
 .../client/clustering/run/strategy/JavaExecutionStrategy.java |  2 +-
 .../apache/hudi/table/action/commit/JavaBulkInsertHelper.java |  4 ++--
 .../commit/JavaBulkInsertPreppedCommitActionExecutor.java     |  9 ++++-----
 .../run/strategy/MultipleSparkJobExecutionStrategy.java       | 11 ++++++-----
 .../action/commit/SparkBulkInsertCommitActionExecutor.java    |  7 ++++---
 .../hudi/table/action/commit/SparkBulkInsertHelper.java       |  4 ++--
 .../commit/SparkBulkInsertPreppedCommitActionExecutor.java    | 10 +++++-----
 .../deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java |  6 +++---
 .../SparkBulkInsertPreppedDeltaCommitActionExecutor.java      |  4 ++--
 10 files changed, 31 insertions(+), 30 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index 29a1350..dffd926 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends 
HoodieRecordPayload, I, K,
   public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String 
instantTime,
                                                     HoodieTable<T, I, K, O> 
table, HoodieWriteConfig config,
                                                     
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
-                                                    
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner);
+                                                    
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
 
   /**
    * Only write input records. Does not change timeline/index. Return 
information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends 
HoodieRecordPayload, I, K,
   public abstract O bulkInsert(I inputRecords, String instantTime,
                                HoodieTable<T, I, K, O> table, 
HoodieWriteConfig config,
                                boolean performDedupe,
-                               Option<BulkInsertPartitioner<T>> 
userDefinedBulkInsertPartitioner,
+                               Option<BulkInsertPartitioner<I>> 
userDefinedBulkInsertPartitioner,
                                boolean addMetadataFields,
                                int parallelism,
                                WriteHandleFactory writeHandleFactory);
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 7e67b08..8cbe0d7 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -121,7 +121,7 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
    * @param schema         Schema of the data including metadata fields.
    * @return empty for now.
    */
-  protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, 
String> strategyParams, Schema schema) {
+  protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
     if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
       return Option.of(new JavaCustomColumnsSortPartitioner(
           strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 87b7fe7..de7afdf 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -65,7 +65,7 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
                                                            final 
HoodieWriteConfig config,
                                                            final 
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>, R> executor,
                                                            final boolean 
performDedupe,
-                                                           final 
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+                                                           final 
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
     // It's possible the transition to inflight could have already happened.
@@ -89,7 +89,7 @@ public class JavaBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Base
                                       HoodieTable<T, List<HoodieRecord<T>>, 
List<HoodieKey>, List<WriteStatus>> table,
                                       HoodieWriteConfig config,
                                       boolean performDedupe,
-                                      Option<BulkInsertPartitioner<T>> 
userDefinedBulkInsertPartitioner,
+                                      
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner,
                                       boolean useWriterSchema,
                                       int parallelism,
                                       WriteHandleFactory writeHandleFactory) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
index 37b56b6..ed72fbe 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
@@ -26,9 +26,8 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import java.util.List;
@@ -37,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T 
extends HoodieRecordPay
     extends BaseJavaCommitActionExecutor<T> {
 
   private final List<HoodieRecord<T>> preppedInputRecord;
-  private final Option<BulkInsertPartitioner<T>> 
userDefinedBulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner;
 
   public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext 
context,
                                                    HoodieWriteConfig config, 
HoodieTable table,
                                                    String instantTime, 
List<HoodieRecord<T>> preppedInputRecord,
-                                                   
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+                                                   
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecord = preppedInputRecord;
     this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
@@ -60,4 +59,4 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T 
extends HoodieRecordPay
       throw new HoodieInsertException("Failed to bulk insert for commit time " 
+ instantTime, e);
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 282cc28..694fdd5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -60,6 +56,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -134,7 +135,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * @param schema         Schema of the data including metadata fields.
    * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are 
provided, otherwise empty.
    */
-  protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, 
String> strategyParams, Schema schema) {
+  protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
     Option<String[]> orderByColumnsOpt =
         Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
             .map(listStr -> listStr.split(","));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
index 2b00d47..f4f1d3a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.spark.api.java.JavaRDD;
 
 import java.util.Map;
@@ -36,17 +37,17 @@ import java.util.Map;
 public class SparkBulkInsertCommitActionExecutor<T extends 
HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
 
   private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
-  private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
bulkInsertPartitioner;
 
   public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, 
HoodieWriteConfig config, HoodieTable table,
                                              String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                             Option<BulkInsertPartitioner<T>> 
bulkInsertPartitioner) {
+                                             
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
     this(context, config, table, instantTime, inputRecordsRDD, 
bulkInsertPartitioner, Option.empty());
   }
 
   public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, 
HoodieWriteConfig config, HoodieTable table,
                                         String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                        Option<BulkInsertPartitioner<T>> 
bulkInsertPartitioner,
+                                        
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
                                         Option<Map<String, String>> 
extraMetadata) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, 
extraMetadata);
     this.inputRecordsRDD = inputRecordsRDD;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index d17b9b4..d0c5dde 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -67,7 +67,7 @@ public class SparkBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Bas
                                                               final 
HoodieWriteConfig config,
                                                               final 
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, R> executor,
                                                               final boolean 
performDedupe,
-                                                              final 
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+                                                              final 
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner) {
     HoodieWriteMetadata result = new HoodieWriteMetadata();
 
     //transition bulk_insert state to inflight
@@ -88,7 +88,7 @@ public class SparkBulkInsertHelper<T extends 
HoodieRecordPayload, R> extends Bas
                                          HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
                                          HoodieWriteConfig config,
                                          boolean performDedupe,
-                                         Option<BulkInsertPartitioner<T>> 
userDefinedBulkInsertPartitioner,
+                                         
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner,
                                          boolean useWriterSchema,
                                          int parallelism,
                                          WriteHandleFactory 
writeHandleFactory) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
index e6b6809..28d8cb0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
@@ -26,22 +26,22 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+
 import org.apache.spark.api.java.JavaRDD;
 
 public class SparkBulkInsertPreppedCommitActionExecutor<T extends 
HoodieRecordPayload<T>>
     extends BaseSparkCommitActionExecutor<T> {
 
   private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
-  private final Option<BulkInsertPartitioner<T>> 
userDefinedBulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner;
 
   public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext 
context,
                                                     HoodieWriteConfig config, 
HoodieTable table,
                                                     String instantTime, 
JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-                                                    
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+                                                    
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
userDefinedBulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecordRdd = preppedInputRecordRdd;
     this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
@@ -60,4 +60,4 @@ public class SparkBulkInsertPreppedCommitActionExecutor<T 
extends HoodieRecordPa
     }
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
index 17baf1d..6f23e41 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
@@ -39,17 +39,17 @@ public class SparkBulkInsertDeltaCommitActionExecutor<T 
extends HoodieRecordPayl
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
   private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
-  private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
bulkInsertPartitioner;
 
   public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table,
                                                   String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                                  
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner)  {
+                                                  
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner)  
{
     this(context, config, table, instantTime, inputRecordsRDD, 
bulkInsertPartitioner, Option.empty());
   }
 
   public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext 
context, HoodieWriteConfig config, HoodieTable table,
                                                   String instantTime, 
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-                                                  
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner,
+                                                  
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
                                                   Option<Map<String, String>> 
extraMetadata) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, 
extraMetadata);
     this.inputRecordsRDD = inputRecordsRDD;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
index a4d7493..be5b903 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
@@ -37,12 +37,12 @@ public class 
SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRec
     extends BaseSparkDeltaCommitActionExecutor<T> {
 
   private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
-  private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+  private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> 
bulkInsertPartitioner;
 
   public 
SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext 
context,
                                                          HoodieWriteConfig 
config, HoodieTable table,
                                                          String instantTime, 
JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-                                                         
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
+                                                         
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
     super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
     this.preppedInputRecordRdd = preppedInputRecordRdd;
     this.bulkInsertPartitioner = bulkInsertPartitioner;

Reply via email to