This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch bulk_insert_as_row_nullable
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1adaa6150a71edd93f55af7dfa33914fedd9c0b2
Author: YueZhang <[email protected]>
AuthorDate: Fri Jun 6 14:33:49 2025 +0800

    unify bulk_insert as row nullable schema with table schema
---
 ...onsistentBucketClusteringExecutionStrategy.java |  3 +--
 .../SparkSingleFileSortExecutionStrategy.java      |  2 +-
 .../SparkSortAndSizeExecutionStrategy.java         |  2 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       | 30 +++++++++-------------
 .../BaseDatasetBulkInsertCommitActionExecutor.java |  2 +-
 ...setBulkInsertOverwriteCommitActionExecutor.java |  2 +-
 6 files changed, 17 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 938a5aeb603..d61fc857245 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
@@ -81,7 +80,7 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, 
instantTime, getHoodieTable(), newConfig,
-        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, 
WriteOperationType.CLUSTER);
+        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 6182997bc7f..563305a7d8a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -83,7 +83,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, 
instantTime, getHoodieTable(), newConfig,
-        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, 
WriteOperationType.CLUSTER);
+        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index e7d18ca877a..5896f815424 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -75,7 +75,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, 
instantTime, getHoodieTable(), newConfig,
-        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata, 
WriteOperationType.CLUSTER);
+        partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 1365bc9a636..e066a5f9e04 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -150,14 +150,8 @@ object HoodieDatasetBulkInsertHelper
                  table: HoodieTable[_, _, _, _],
                  writeConfig: HoodieWriteConfig,
                  arePartitionRecordsSorted: Boolean,
-                 shouldPreserveHoodieMetadata: Boolean,
-                 operation: WriteOperationType): HoodieData[WriteStatus] = {
-    val schema = operation match {
-      case WriteOperationType.CLUSTER =>
-        alignNotNullFields(dataset.schema, new 
Schema.Parser().parse(writeConfig.getSchema))
-      case _ =>
-        dataset.schema
-    }
+                 shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
+    val schema = alignNotNullFields(dataset.schema, new 
Schema.Parser().parse(writeConfig.getSchema))
     HoodieJavaRDD.of(
       injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
         val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
@@ -225,17 +219,17 @@ object HoodieDatasetBulkInsertHelper
       .filter(f => !f.schema.isNullable)
       .map(f => f.name)
     if (notNullFieldNames.isEmpty) {
-      return sourceSchema
+      sourceSchema
+    } else {
+      val copiedFields = sourceSchema.fields.map(field => {
+        if (notNullFieldNames.contains(field.name)) {
+          field.copy(nullable = false)
+        } else {
+          field.copy()
+        }
+      }).toSeq
+      StructType(copiedFields)
     }
-
-    val copiedFields = sourceSchema.fields.map(field => {
-      if (notNullFieldNames.contains(field.name)) {
-        field.copy(nullable = false)
-      } else {
-        field.copy()
-      }
-    }).toSeq
-    StructType(copiedFields)
   }
 
   private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, 
preCombineFieldRef: String, isGlobalIndex: Boolean, targetParallelism: Int): 
RDD[InternalRow] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index 3b68659abf7..16041fdae5a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -77,7 +77,7 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
     
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
         getCommitActionType(), instantTime), Option.empty());
     return Option.of(HoodieDatasetBulkInsertHelper
-        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false, getWriteOperationType()));
+        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false));
   }
 
   protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index 9780e90cf92..325b1334027 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -52,7 +52,7 @@ public class DatasetBulkInsertOverwriteCommitActionExecutor 
extends BaseDatasetB
     
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
         getCommitActionType(), instantTime), Option.empty());
     return Option.of(HoodieDatasetBulkInsertHelper
-        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false, getWriteOperationType()));
+        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false));
   }
 
   @Override

Reply via email to