This is an automated email from the ASF dual-hosted git repository. zhangyue19921010 pushed a commit to branch bulk_insert_as_row_nullable in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1adaa6150a71edd93f55af7dfa33914fedd9c0b2 Author: YueZhang <[email protected]> AuthorDate: Fri Jun 6 14:33:49 2025 +0800 unify bulk_insert as row nullable schema with table schema --- ...onsistentBucketClusteringExecutionStrategy.java | 3 +-- .../SparkSingleFileSortExecutionStrategy.java | 2 +- .../SparkSortAndSizeExecutionStrategy.java | 2 +- .../hudi/HoodieDatasetBulkInsertHelper.scala | 30 +++++++++------------- .../BaseDatasetBulkInsertCommitActionExecutor.java | 2 +- ...setBulkInsertOverwriteCommitActionExecutor.java | 2 +- 6 files changed, 17 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java index 938a5aeb603..d61fc857245 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.ConsistentHashingNode; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; @@ -81,7 +80,7 @@ public class SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups); return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig, - partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, WriteOperationType.CLUSTER); + partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 6182997bc7f..563305a7d8a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -83,7 +83,7 @@ public class SparkSingleFileSortExecutionStrategy<T> Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups); return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig, - partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, WriteOperationType.CLUSTER); + partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index e7d18ca877a..5896f815424 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -75,7 +75,7 @@ public class SparkSortAndSizeExecutionStrategy<T> Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups); return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig, - partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, WriteOperationType.CLUSTER); + partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 1365bc9a636..e066a5f9e04 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -150,14 +150,8 @@ object HoodieDatasetBulkInsertHelper table: HoodieTable[_, _, _, _], writeConfig: HoodieWriteConfig, arePartitionRecordsSorted: Boolean, - shouldPreserveHoodieMetadata: Boolean, - operation: WriteOperationType): HoodieData[WriteStatus] = { - val schema = operation match { - case WriteOperationType.CLUSTER => - alignNotNullFields(dataset.schema, new Schema.Parser().parse(writeConfig.getSchema)) - case _ => - dataset.schema - } + shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = { + val schema = alignNotNullFields(dataset.schema, new Schema.Parser().parse(writeConfig.getSchema)) HoodieJavaRDD.of( injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => { val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier @@ -225,17 +219,17 @@ object HoodieDatasetBulkInsertHelper .filter(f => !f.schema.isNullable) .map(f => f.name) if (notNullFieldNames.isEmpty) { - return sourceSchema + sourceSchema + } else { + val copiedFields = sourceSchema.fields.map(field => { + if (notNullFieldNames.contains(field.name)) { + field.copy(nullable = false) + } else { + field.copy() + } + }).toSeq + StructType(copiedFields) } - - val copiedFields = sourceSchema.fields.map(field => { - if (notNullFieldNames.contains(field.name)) { - field.copy(nullable = false) - } else { - field.copy() - } - }).toSeq - StructType(copiedFields) } private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean, targetParallelism: Int): RDD[InternalRow] = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index 3b68659abf7..16041fdae5a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -77,7 +77,7 @@ public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Seria table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, getCommitActionType(), instantTime), Option.empty()); return Option.of(HoodieDatasetBulkInsertHelper - .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType())); + .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false)); } protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java index 9780e90cf92..325b1334027 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java @@ -52,7 +52,7 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor extends BaseDatasetB table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, getCommitActionType(), instantTime), Option.empty()); return Option.of(HoodieDatasetBulkInsertHelper - .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType())); + .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false)); } @Override
