This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0258a89112a [HUDI-6515] Fix Spark2 do not support bucket bulk insert
(#9163)
0258a89112a is described below
commit 0258a89112a6071a8074757236e19a7b27539dbd
Author: StreamingFlames <[email protected]>
AuthorDate: Tue Jul 11 13:45:35 2023 +0800
[HUDI-6515] Fix Spark2 do not support bucket bulk insert (#9163)
---
.../apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
index 9f878dfa572..666ca3ec989 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java
@@ -19,7 +19,9 @@
package org.apache.hudi.internal;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -39,8 +41,11 @@ public class HoodieBulkInsertDataInternalWriter implements
DataWriter<InternalRo
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int
taskPartitionId, long taskId, long taskEpochId,
StructType structType, boolean
populateMetaFields, boolean arePartitionRecordsSorted) {
- this.bulkInsertWriterHelper = new
BulkInsertDataInternalWriterHelper(hoodieTable,
- writeConfig, instantTime, taskPartitionId, taskId, taskEpochId,
structType, populateMetaFields, arePartitionRecordsSorted);
+ this.bulkInsertWriterHelper = writeConfig.getIndexType() ==
HoodieIndex.IndexType.BUCKET
+ ? new BucketBulkInsertDataInternalWriterHelper(hoodieTable,
+ writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted)
+ : new BulkInsertDataInternalWriterHelper(hoodieTable,
+ writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted);
}
@Override