This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0340a10b96 [HUDI-6381] Support bucket bulk insert for CoW table 
(#8983)
a0340a10b96 is described below

commit a0340a10b96e7a4f43e727338b261c4514d2df5a
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Jun 19 09:58:20 2023 +0800

    [HUDI-6381] Support bucket bulk insert for CoW table (#8983)
---
 .../bulkinsert/RDDBucketIndexPartitioner.java      |  4 -
 .../RDDConsistentBucketBulkInsertPartitioner.java  |  3 +
 .../TestRDDSimpleBucketBulkInsertPartitioner.java  | 31 ++++---
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 95 +++++++++++-----------
 4 files changed, 73 insertions(+), 60 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index c2f93f85960..98a0a1efd5d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -23,8 +23,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.table.BucketIndexBulkInsertPartitioner;
 
@@ -48,8 +46,6 @@ public abstract class RDDBucketIndexPartitioner<T> extends 
BucketIndexBulkInsert
 
   public RDDBucketIndexPartitioner(HoodieTable table, String sortString, 
boolean preserveHoodieMetadata) {
     super(table, sortString, preserveHoodieMetadata);
-    
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
-        "CoW table with bucket index doesn't support bulk_insert");
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
index 06008680574..87d6e19ce9f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
@@ -50,6 +51,8 @@ public class RDDConsistentBucketBulkInsertPartitioner<T> 
extends RDDBucketIndexP
     super(table,
         strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
         preserveHoodieMetadata);
+    
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
+        "Consistent hash bucket index doesn't support CoW table");
     this.hashingChildrenNodes = new HashMap<>();
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
index cb579121bb7..e404b75ed50 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
@@ -34,16 +34,15 @@ import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -65,8 +64,8 @@ public class TestRDDSimpleBucketBulkInsertPartitioner extends 
HoodieClientTestHa
 
   @ParameterizedTest
   @MethodSource("configParams")
-  public void testSimpleBucketPartitioner(boolean partitionSort) throws 
IOException {
-    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath, 
HoodieTableType.MERGE_ON_READ);
+  public void testSimpleBucketPartitioner(String tableType, boolean 
partitionSort) throws IOException {
+    HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath, 
HoodieTableType.valueOf(tableType));
     int bucketNum = 10;
     HoodieWriteConfig config = HoodieWriteConfig
         .newBuilder()
@@ -116,11 +115,23 @@ public class TestRDDSimpleBucketBulkInsertPartitioner 
extends HoodieClientTestHa
     writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(), 
writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
   }
 
-  private static Stream<Arguments> configParams() {
-    Object[][] data = new Object[][]{
-        {true},
-        {false},
-    };
-    return Stream.of(data).map(Arguments::of);
+  private static final List<Object> TABLE_TYPES = Arrays.asList(
+      "COPY_ON_WRITE",
+      "MERGE_ON_READ"
+  );
+
+  private static Iterable<Object> tableTypes() {
+    return TABLE_TYPES;
   }
+
+  // table type, partitionSort
+  private static Iterable<Object[]> configParams() {
+    List<Object[]> opts = new ArrayList<>();
+    for (Object tableType : TABLE_TYPES) {
+      opts.add(new Object[] {tableType, "true"});
+      opts.add(new Object[] {tableType, "false"});
+    }
+    return opts;
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 0d9b8fa9a3a..27093f2dd44 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1077,56 +1077,59 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test Bulk Insert Into Bucket Index Table") {
     withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
-      withTempDir { tmp =>
-        val tableName = generateTableName
-        // Create a partitioned table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  dt string,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | tblproperties (
-             | primaryKey = 'id,name',
-             | preCombineField = 'ts',
-             | hoodie.index.type = 'BUCKET',
-             | hoodie.bucket.index.hash.field = 'id,name')
-             | partitioned by (dt)
-             | location '${tmp.getCanonicalPath}'
-       """.stripMargin)
+      Seq("mor", "cow").foreach { tableType =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  dt string,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               | primaryKey = 'id,name',
+               | type = '$tableType',
+               | preCombineField = 'ts',
+               | hoodie.index.type = 'BUCKET',
+               | hoodie.bucket.index.hash.field = 'id,name')
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}'
+               """.stripMargin)
 
-        // Note: Do not write the field alias, the partition field must be 
placed last.
-        spark.sql(
-          s"""
-             | insert into $tableName values
-             | (1, 'a1,1', 10, 1000, "2021-01-05"),
-             | (2, 'a2', 20, 2000, "2021-01-06"),
-             | (3, 'a3,3', 30, 3000, "2021-01-07")
-              """.stripMargin)
+          // Note: Do not write the field alias, the partition field must be 
placed last.
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1,1', 10, 1000, "2021-01-05"),
+               | (2, 'a2', 20, 2000, "2021-01-06"),
+               | (3, 'a3,3', 30, 3000, "2021-01-07")
+                      """.stripMargin)
 
-        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
-        )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+          )
 
-        spark.sql(
-          s"""
-             | insert into $tableName values
-             | (1, 'a1', 10, 1000, "2021-01-05"),
-             | (3, "a3", 30, 3000, "2021-01-07")
-              """.stripMargin)
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1', 10, 1000, "2021-01-05"),
+               | (3, "a3", 30, 3000, "2021-01-07")
+                      """.stripMargin)
 
-        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
-          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
-          Seq(3, "a3", 30.0, 3000, "2021-01-07")
-        )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+            Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+            Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+            Seq(3, "a3", 30.0, 3000, "2021-01-07")
+          )
+        }
       }
     }
   }

Reply via email to