This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a08bdc3f97 [HUDI-5363] Removing default value for shuffle parallelism
configs (#7723)
3a08bdc3f97 is described below
commit 3a08bdc3f971b3534e8fb6f34772340cfdf055a9
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Jan 25 19:29:42 2023 -0800
[HUDI-5363] Removing default value for shuffle parallelism configs (#7723)
Currently, we always override the parallelism (translating into the # of
partitions in Spark for ex) of the incoming datasets no matter whether user
requested that or not:
1. If user specified shuffle parallelism explicitly, we'd use it to
override the original one
2. If user did NOT specify shuffle parallelism, we'd use default value of
200
Second case is problematic: we're blindly overriding parallelism of the
data deduced by Spark (determined based on the source of the data) replacing it
with _static_ value (having nothing to do w/ the data itself).
Instead, we should only be overriding the parallelism in cases when
corresponding configuration has been explicitly provide by the user.
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 10 +++---
.../table/action/commit/BaseBulkInsertHelper.java | 7 +++-
.../hudi/table/action/commit/BaseDeleteHelper.java | 7 +++-
.../hudi/table/action/commit/BaseWriteHelper.java | 14 ++++++--
.../table/action/commit/HoodieDeleteHelper.java | 35 ++++++++++--------
.../table/action/commit/HoodieWriteHelper.java | 3 +-
.../table/action/commit/ParallelismHelper.java | 42 ++++++++++++++++++++++
.../table/action/commit/FlinkDeleteHelper.java | 1 +
.../hudi/table/action/commit/FlinkWriteHelper.java | 3 +-
.../table/action/commit/JavaBulkInsertHelper.java | 12 ++++---
.../hudi/table/action/commit/JavaDeleteHelper.java | 1 +
.../hudi/table/action/commit/JavaWriteHelper.java | 1 +
.../table/action/commit/SparkBulkInsertHelper.java | 24 ++++++++-----
.../hudi/HoodieDatasetBulkInsertHelper.scala | 16 ++++++---
.../scala/org/apache/hudi/util/JFunction.scala | 7 +++-
.../org/apache/spark/sql/HoodieUnsafeUtils.scala | 11 ++++++
16 files changed, 148 insertions(+), 46 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index fcee1b4b0d6..2f36aa725e3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -228,12 +228,12 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.insert.shuffle.parallelism")
- .defaultValue("200")
+ .defaultValue("0")
.withDocumentation("Parallelism for inserting records into the table.
Inserts can shuffle data before writing to tune file sizes and optimize the
storage layout.");
public static final ConfigProperty<String> BULKINSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.bulkinsert.shuffle.parallelism")
- .defaultValue("200")
+ .defaultValue("0")
.withDocumentation("For large initial imports using bulk_insert
operation, controls the parallelism to use for sort modes or custom
partitioning done"
+ "before writing records to the table.");
@@ -252,13 +252,13 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> UPSERT_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.upsert.shuffle.parallelism")
- .defaultValue("200")
+ .defaultValue("0")
.withDocumentation("Parallelism to use for upsert operation on the
table. Upserts can shuffle data to perform index lookups, file sizing, bin
packing records optimally"
+ "into file groups.");
public static final ConfigProperty<String> DELETE_PARALLELISM_VALUE =
ConfigProperty
.key("hoodie.delete.shuffle.parallelism")
- .defaultValue("200")
+ .defaultValue("0")
.withDocumentation("Parallelism used for “delete” operation. Delete
operations also performs shuffles, similar to upsert operation.");
public static final ConfigProperty<String> ROLLBACK_PARALLELISM_VALUE =
ConfigProperty
@@ -1156,7 +1156,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getDeleteShuffleParallelism() {
- return Math.max(getInt(DELETE_PARALLELISM_VALUE), 1);
+ return getInt(DELETE_PARALLELISM_VALUE);
}
public int getRollbackParallelism() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index b5599385679..451feab4e23 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.commit;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.WriteHandleFactory;
@@ -25,7 +26,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-public abstract class BaseBulkInsertHelper<T, I, K, O, R> {
+public abstract class BaseBulkInsertHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
+
+ protected BaseBulkInsertHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
+ super(partitionNumberExtractor);
+ }
/**
* Mark instant as inflight, write input records, update index and return
result.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
index ceeb2aeb70d..527320a6b6a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseDeleteHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -28,7 +29,11 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
*
* @param <T>
*/
-public abstract class BaseDeleteHelper<T, I, K, O, R> {
+public abstract class BaseDeleteHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
+
+ protected BaseDeleteHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
+ super(partitionNumberExtractor);
+ }
/**
* Deduplicate Hoodie records, using the given deduplication function.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index adef1c44591..6e1c1f8d9ff 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
@@ -33,20 +34,27 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
-public abstract class BaseWriteHelper<T, I, K, O, R> {
+public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
+
+ protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
+ super(partitionNumberExtractor);
+ }
public HoodieWriteMetadata<O> write(String instantTime,
I inputRecords,
HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
boolean shouldCombine,
- int shuffleParallelism,
+ int configuredShuffleParallelism,
BaseCommitActionExecutor<T, I, K, O, R>
executor,
WriteOperationType operationType) {
try {
+ int targetParallelism =
+ deduceShuffleParallelism(inputRecords, configuredShuffleParallelism);
+
// De-dupe/merge if needed
I dedupedRecords =
- combineOnCondition(shouldCombine, inputRecords, shuffleParallelism,
table);
+ combineOnCondition(shouldCombine, inputRecords, targetParallelism,
table);
Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 0d212555ab2..917c1858b45 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -48,7 +48,9 @@ import java.util.HashMap;
@SuppressWarnings("checkstyle:LineLength")
public class HoodieDeleteHelper<T, R> extends
BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, R> {
+
private HoodieDeleteHelper() {
+ super(HoodieData::getNumPartitions);
}
private static class DeleteHelperHolder {
@@ -77,24 +79,18 @@ public class HoodieDeleteHelper<T, R> extends
HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>
table,
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, R> deleteExecutor) {
try {
- HoodieData<HoodieKey> dedupedKeys = keys;
- final int parallelism = config.getDeleteShuffleParallelism();
- if (config.shouldCombineBeforeDelete()) {
- // De-dupe/merge if needed
- dedupedKeys = deduplicateKeys(keys, table, parallelism);
- } else if (!keys.isEmpty()) {
- dedupedKeys = keys.repartition(parallelism);
- }
+ int targetParallelism =
+ deduceShuffleParallelism((HoodieData) keys,
config.getDeleteShuffleParallelism());
- HoodieData dedupedRecords;
- HoodieRecordType recordType = config.getRecordMerger().getRecordType();
- if (recordType == HoodieRecordType.AVRO) {
- // For BWC, will remove when HoodieRecordPayload removed
- dedupedRecords =
- dedupedKeys.map(key -> new HoodieAvroRecord(key, new
EmptyHoodieRecordPayload()));
+ HoodieData<HoodieKey> dedupedKeys;
+ if (config.shouldCombineBeforeDelete()) {
+ dedupedKeys = deduplicateKeys(keys, table, targetParallelism);
} else {
- dedupedRecords = dedupedKeys.map(key -> new HoodieEmptyRecord<>(key,
recordType));
+ dedupedKeys = keys.repartition(targetParallelism);
}
+
+ HoodieData dedupedRecords = createPhonyRecords(config, dedupedKeys);
+
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
HoodieData<HoodieRecord<T>> taggedRecords =
table.getIndex().tagLocation(dedupedRecords, context, table);
@@ -122,4 +118,13 @@ public class HoodieDeleteHelper<T, R> extends
}
}
+ private static HoodieData createPhonyRecords(HoodieWriteConfig config,
HoodieData<HoodieKey> keys) {
+ HoodieRecordType recordType = config.getRecordMerger().getRecordType();
+ if (recordType == HoodieRecordType.AVRO) {
+ return keys.map(key -> new HoodieAvroRecord(key, new
EmptyHoodieRecordPayload()));
+ } else {
+ return keys.map(key -> new HoodieEmptyRecord<>(key, recordType));
+ }
+ }
+
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 6557f83b241..b8e761acb02 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -35,7 +35,9 @@ import java.io.IOException;
public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T,
HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+
private HoodieWriteHelper() {
+ super(HoodieData::getNumPartitions);
}
private static class WriteHelperHolder {
@@ -79,5 +81,4 @@ public class HoodieWriteHelper<T, R> extends
BaseWriteHelper<T, HoodieData<Hoodi
return reducedRecord.newInstance(reducedKey);
}, reduceParallelism).map(Pair::getRight);
}
-
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
new file mode 100644
index 00000000000..20ec4b61fc0
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/ParallelismHelper.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+
+public abstract class ParallelismHelper<I> {
+
+ private final SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor;
+
+ protected ParallelismHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
+ this.partitionNumberExtractor = partitionNumberExtractor;
+ }
+
+ protected int deduceShuffleParallelism(I input, int configuredParallelism) {
+ // NOTE: In case parallelism was configured by the user we will always
+ // honor that setting.
+ // Otherwise, we'd keep parallelism of the incoming dataset
+ if (configuredParallelism > 0) {
+ return configuredParallelism;
+ }
+
+ return partitionNumberExtractor.apply(input);
+ }
+
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
index f6b172e125b..a8e2937ed3b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
@@ -50,6 +50,7 @@ public class FlinkDeleteHelper<R> extends
BaseDeleteHelper<EmptyHoodieRecordPayload,
List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>,
List<WriteStatus>, R> {
private FlinkDeleteHelper() {
+ super(ignored -> -1);
}
private static class DeleteHelperHolder {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
index 88554576849..6cdb17715de 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
@@ -55,6 +55,7 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, List<HoodieRecord
List<HoodieKey>, List<WriteStatus>, R> {
private FlinkWriteHelper() {
+ super(ignored -> -1);
}
private static class WriteHelperHolder {
@@ -67,7 +68,7 @@ public class FlinkWriteHelper<T, R> extends
BaseWriteHelper<T, List<HoodieRecord
@Override
public HoodieWriteMetadata<List<WriteStatus>> write(String instantTime,
List<HoodieRecord<T>> inputRecords, HoodieEngineContext context,
- HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean
shouldCombine, int shuffleParallelism,
+ HoodieTable<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, boolean
shouldCombine, int configuredShuffleParallelism,
BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>,
List<WriteStatus>, R> executor, WriteOperationType operationType) {
try {
Instant lookupBegin = Instant.now();
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 0c76ea168a7..c10b7d1752d 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -48,6 +48,7 @@ public class JavaBulkInsertHelper<T, R> extends
BaseBulkInsertHelper<T, List<Hoo
List<HoodieKey>, List<WriteStatus>, R> {
private JavaBulkInsertHelper() {
+ super(ignored -> -1);
}
private static class BulkInsertHelperHolder {
@@ -94,18 +95,21 @@ public class JavaBulkInsertHelper<T, R> extends
BaseBulkInsertHelper<T, List<Hoo
boolean performDedupe,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
- int parallelism,
+ int configuredParallelism,
WriteHandleFactory writeHandleFactory) {
// De-dupe/merge if needed
List<HoodieRecord<T>> dedupedRecords = inputRecords;
+ int targetParallelism = deduceShuffleParallelism(inputRecords,
configuredParallelism);
+
if (performDedupe) {
- dedupedRecords = (List<HoodieRecord<T>>)
JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(),
inputRecords,
- parallelism, table);
+ dedupedRecords = (List<HoodieRecord<T>>) JavaWriteHelper.newInstance()
+ .combineOnCondition(config.shouldCombineBeforeInsert(),
inputRecords, targetParallelism, table);
}
- final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>)
partitioner.repartitionRecords(dedupedRecords, parallelism);
+ final List<HoodieRecord<T>> repartitionedRecords =
+ (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords,
targetParallelism);
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider)
ReflectionUtils.loadClass(
config.getFileIdPrefixProviderClassName(),
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
index 57d796c9252..4cee88f6bf4 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java
@@ -47,6 +47,7 @@ public class JavaDeleteHelper<R> extends
BaseDeleteHelper<EmptyHoodieRecordPayload,
List<HoodieRecord<EmptyHoodieRecordPayload>>, List<HoodieKey>,
List<WriteStatus>, R> {
private JavaDeleteHelper() {
+ super(ignored -> -1);
}
private static class DeleteHelperHolder {
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index dc109f81030..f3046bf22e2 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -41,6 +41,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T,
List<HoodieRecord<T
List<HoodieKey>, List<WriteStatus>, R> {
private JavaWriteHelper() {
+ super(ignored -> -1);
}
private static class WriteHelperHolder {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index 51407eb026d..fc4b8bf1006 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -48,6 +48,7 @@ public class SparkBulkInsertHelper<T, R> extends
BaseBulkInsertHelper<T, HoodieD
HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
private SparkBulkInsertHelper() {
+ super(HoodieData::getNumPartitions);
}
private static class BulkInsertHelperHolder {
@@ -68,17 +69,19 @@ public class SparkBulkInsertHelper<T, R> extends
BaseBulkInsertHelper<T, HoodieD
final
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
- //transition bulk_insert state to inflight
+ // Transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
BulkInsertPartitioner partitioner =
userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(table,
config));
- // write new files
- HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords,
instantTime, table, config, performDedupe, partitioner, false,
- config.getBulkInsertShuffleParallelism(), new
CreateHandleFactory(false));
- //update index
+ // Write new files
+ HoodieData<WriteStatus> writeStatuses =
+ bulkInsert(inputRecords, instantTime, table, config, performDedupe,
partitioner, false,
+ config.getBulkInsertShuffleParallelism(), new
CreateHandleFactory(false));
+
+ // Update index
((BaseSparkCommitActionExecutor)
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
@@ -105,19 +108,22 @@ public class SparkBulkInsertHelper<T, R> extends
BaseBulkInsertHelper<T, HoodieD
boolean performDedupe,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
- int parallelism,
+ int configuredParallelism,
WriteHandleFactory
writeHandleFactory) {
// De-dupe/merge if needed
HoodieData<HoodieRecord<T>> dedupedRecords = inputRecords;
+ int targetParallelism = deduceShuffleParallelism(inputRecords,
configuredParallelism);
+
if (performDedupe) {
- dedupedRecords = (HoodieData<HoodieRecord<T>>)
HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(),
inputRecords,
- parallelism, table);
+ dedupedRecords = (HoodieData<HoodieRecord<T>>)
HoodieWriteHelper.newInstance()
+ .combineOnCondition(config.shouldCombineBeforeInsert(),
inputRecords, targetParallelism, table);
}
// only JavaRDD is supported for Spark partitioner, but it is not enforced
by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
- final HoodieData<HoodieRecord<T>> repartitionedRecords =
HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords),
parallelism));
+ final HoodieData<HoodieRecord<T>> repartitionedRecords =
+ HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords),
targetParallelism));
JavaRDD<WriteStatus> writeStatusRDD =
HoodieJavaRDD.getJavaRDD(repartitionedRecords)
.mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index ffd2d6071d1..238fed526a1 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -22,26 +22,29 @@ import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.engine.TaskContextSupplier
-import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
+import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
+import
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper,
ParallelismHelper}
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
-import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper
+import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue}
+import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.types.{DataType, StringType, StructField,
StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.{asScalaBufferConverter,
seqAsJavaListConverter}
-object HoodieDatasetBulkInsertHelper extends Logging {
+object HoodieDatasetBulkInsertHelper
+ extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df
=> getOutputPartitioning(df).numPartitions)) with Logging {
/**
* Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following
steps:
@@ -115,7 +118,10 @@ object HoodieDatasetBulkInsertHelper extends Logging {
updatedDF
}
- partitioner.repartitionRecords(trimmedDF,
config.getBulkInsertShuffleParallelism)
+ val targetParallelism =
+ deduceShuffleParallelism(trimmedDF,
config.getBulkInsertShuffleParallelism)
+
+ partitioner.repartitionRecords(trimmedDF, targetParallelism)
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
index 0c379c27dfc..3517d641448 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -17,7 +17,7 @@
package org.apache.hudi.util
-import org.apache.hudi.common.function.{SerializableFunction,
SerializablePairFunction}
+import org.apache.hudi.common.function.{SerializableFunction,
SerializableFunctionUnchecked, SerializablePairFunction}
import org.apache.hudi.common.util.collection
import scala.language.implicitConversions
@@ -53,6 +53,11 @@ object JFunction {
override def apply(t: T): R = f.apply(t)
}
+ implicit def toJavaSerializableFunctionUnchecked[T, R](f: Function[T, R]):
SerializableFunctionUnchecked[T, R] =
+ new SerializableFunctionUnchecked[T, R] {
+ override def apply(t: T): R = f.apply(t)
+ }
+
implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T,
collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
new SerializablePairFunction[T, K, V] {
override def call(t: T): collection.Pair[K, V] = f.apply(t)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index c981cd8113c..dfd416b6f52 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
@@ -30,6 +31,16 @@ import org.apache.spark.util.MutablePair
*/
object HoodieUnsafeUtils {
+ /**
+ * Fetches expected output [[Partitioning]] of the provided [[DataFrame]]
+ *
+ * NOTE: Invoking [[QueryExecution#executedPlan]] wouldn't actually execute
the query (ie start pumping the data)
+ * but instead will just execute Spark resolution, optimization and
actual execution planning stages
+ * returning instance of [[SparkPlan]] ready for execution
+ */
+ def getOutputPartitioning(df: DataFrame): Partitioning =
+ df.queryExecution.executedPlan.outputPartitioning
+
/**
* Creates [[DataFrame]] from provided [[plan]]
*