This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0938f55 [HUDI-3458] Fix BulkInsertPartitioner generic type (#4854)
0938f55 is described below
commit 0938f55a2b033745b5e2dafad8ce31e60bb394ac
Author: Raymond Xu <[email protected]>
AuthorDate: Sun Feb 20 10:51:58 2022 -0800
[HUDI-3458] Fix BulkInsertPartitioner generic type (#4854)
---
.../apache/hudi/table/action/commit/BaseBulkInsertHelper.java | 4 ++--
.../client/clustering/run/strategy/JavaExecutionStrategy.java | 2 +-
.../apache/hudi/table/action/commit/JavaBulkInsertHelper.java | 4 ++--
.../commit/JavaBulkInsertPreppedCommitActionExecutor.java | 9 ++++-----
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 11 ++++++-----
.../action/commit/SparkBulkInsertCommitActionExecutor.java | 7 ++++---
.../hudi/table/action/commit/SparkBulkInsertHelper.java | 4 ++--
.../commit/SparkBulkInsertPreppedCommitActionExecutor.java | 10 +++++-----
.../deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java | 6 +++---
.../SparkBulkInsertPreppedDeltaCommitActionExecutor.java | 4 ++--
10 files changed, 31 insertions(+), 30 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index 29a1350..dffd926 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends
HoodieRecordPayload, I, K,
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String
instantTime,
HoodieTable<T, I, K, O>
table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
-
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner);
+
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
/**
* Only write input records. Does not change timeline/index. Return
information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends
HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner<T>>
userDefinedBulkInsertPartitioner,
+ Option<BulkInsertPartitioner<I>>
userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 7e67b08..8cbe0d7 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -121,7 +121,7 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
* @param schema Schema of the data including metadata fields.
* @return empty for now.
*/
- protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String,
String> strategyParams, Schema schema) {
+ protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 87b7fe7..de7afdf 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -65,7 +65,7 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
final
HoodieWriteConfig config,
final
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, R> executor,
final boolean
performDedupe,
- final
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+ final
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// It's possible the transition to inflight could have already happened.
@@ -89,7 +89,7 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner<T>>
userDefinedBulkInsertPartitioner,
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
index 37b56b6..ed72fbe 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
@@ -26,9 +26,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import java.util.List;
@@ -37,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T
extends HoodieRecordPay
extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> preppedInputRecord;
- private final Option<BulkInsertPartitioner<T>>
userDefinedBulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner;
public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext
context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
List<HoodieRecord<T>> preppedInputRecord,
-
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+
Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecord = preppedInputRecord;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
@@ -60,4 +59,4 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T
extends HoodieRecordPay
throw new HoodieInsertException("Failed to bulk insert for commit time "
+ instantTime, e);
}
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 282cc28..694fdd5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,10 +18,6 @@
package org.apache.hudi.client.clustering.run.strategy;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -60,6 +56,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -134,7 +135,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are
provided, otherwise empty.
*/
- protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String,
String> strategyParams, Schema schema) {
+ protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
index 2b00d47..f4f1d3a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+
import org.apache.spark.api.java.JavaRDD;
import java.util.Map;
@@ -36,17 +37,17 @@ import java.util.Map;
public class SparkBulkInsertCommitActionExecutor<T extends
HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
bulkInsertPartitioner;
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<T>>
bulkInsertPartitioner) {
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecordsRDD,
bulkInsertPartitioner, Option.empty());
}
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<T>>
bulkInsertPartitioner,
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
Option<Map<String, String>>
extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT,
extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index d17b9b4..d0c5dde 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -67,7 +67,7 @@ public class SparkBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Bas
final
HoodieWriteConfig config,
final
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>,
JavaRDD<WriteStatus>, R> executor,
final boolean
performDedupe,
- final
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+ final
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
@@ -88,7 +88,7 @@ public class SparkBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Bas
HoodieTable<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner<T>>
userDefinedBulkInsertPartitioner,
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory
writeHandleFactory) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
index e6b6809..28d8cb0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
@@ -26,22 +26,22 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
-import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.BulkInsertPartitioner;
-
+import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+
import org.apache.spark.api.java.JavaRDD;
public class SparkBulkInsertPreppedCommitActionExecutor<T extends
HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
- private final Option<BulkInsertPartitioner<T>>
userDefinedBulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner;
public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext
context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner) {
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
@@ -60,4 +60,4 @@ public class SparkBulkInsertPreppedCommitActionExecutor<T
extends HoodieRecordPa
}
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
index 17baf1d..6f23e41 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
@@ -39,17 +39,17 @@ public class SparkBulkInsertDeltaCommitActionExecutor<T
extends HoodieRecordPayl
extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
bulkInsertPartitioner;
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext
context, HoodieWriteConfig config, HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner)
{
this(context, config, table, instantTime, inputRecordsRDD,
bulkInsertPartitioner, Option.empty());
}
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext
context, HoodieWriteConfig config, HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD,
-
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner,
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
Option<Map<String, String>>
extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT,
extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
index a4d7493..be5b903 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
@@ -37,12 +37,12 @@ public class
SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRec
extends BaseSparkDeltaCommitActionExecutor<T> {
private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
- private final Option<BulkInsertPartitioner<T>> bulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
bulkInsertPartitioner;
public
SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext
context,
HoodieWriteConfig
config, HoodieTable table,
String instantTime,
JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
-
Option<BulkInsertPartitioner<T>> bulkInsertPartitioner) {
+
Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;