This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a0340a10b96 [HUDI-6381] Support bucket bulk insert for CoW table
(#8983)
a0340a10b96 is described below
commit a0340a10b96e7a4f43e727338b261c4514d2df5a
Author: StreamingFlames <[email protected]>
AuthorDate: Mon Jun 19 09:58:20 2023 +0800
[HUDI-6381] Support bucket bulk insert for CoW table (#8983)
---
.../bulkinsert/RDDBucketIndexPartitioner.java | 4 -
.../RDDConsistentBucketBulkInsertPartitioner.java | 3 +
.../TestRDDSimpleBucketBulkInsertPartitioner.java | 31 ++++---
.../apache/spark/sql/hudi/TestInsertTable.scala | 95 +++++++++++-----------
4 files changed, 73 insertions(+), 60 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index c2f93f85960..98a0a1efd5d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -23,8 +23,6 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.table.BucketIndexBulkInsertPartitioner;
@@ -48,8 +46,6 @@ public abstract class RDDBucketIndexPartitioner<T> extends
BucketIndexBulkInsert
public RDDBucketIndexPartitioner(HoodieTable table, String sortString,
boolean preserveHoodieMetadata) {
super(table, sortString, preserveHoodieMetadata);
-
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
- "CoW table with bucket index doesn't support bulk_insert");
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
index 06008680574..87d6e19ce9f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
@@ -50,6 +51,8 @@ public class RDDConsistentBucketBulkInsertPartitioner<T>
extends RDDBucketIndexP
super(table,
strategyParams.getOrDefault(PLAN_STRATEGY_SORT_COLUMNS.key(), null),
preserveHoodieMetadata);
+
ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ),
+ "Consistent hash bucket index doesn't support CoW table");
this.hashingChildrenNodes = new HashMap<>();
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
index cb579121bb7..e404b75ed50 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestRDDSimpleBucketBulkInsertPartitioner.java
@@ -34,16 +34,15 @@ import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -65,8 +64,8 @@ public class TestRDDSimpleBucketBulkInsertPartitioner extends
HoodieClientTestHa
@ParameterizedTest
@MethodSource("configParams")
- public void testSimpleBucketPartitioner(boolean partitionSort) throws
IOException {
- HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath,
HoodieTableType.MERGE_ON_READ);
+ public void testSimpleBucketPartitioner(String tableType, boolean
partitionSort) throws IOException {
+ HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath,
HoodieTableType.valueOf(tableType));
int bucketNum = 10;
HoodieWriteConfig config = HoodieWriteConfig
.newBuilder()
@@ -116,11 +115,23 @@ public class TestRDDSimpleBucketBulkInsertPartitioner
extends HoodieClientTestHa
writeStatues2.forEach(ws -> assertEquals(ws.getTotalRecords(),
writeStatuesMap.get(ws.getFileId()).getTotalRecords()));
}
- private static Stream<Arguments> configParams() {
- Object[][] data = new Object[][]{
- {true},
- {false},
- };
- return Stream.of(data).map(Arguments::of);
+ private static final List<Object> TABLE_TYPES = Arrays.asList(
+ "COPY_ON_WRITE",
+ "MERGE_ON_READ"
+ );
+
+ private static Iterable<Object> tableTypes() {
+ return TABLE_TYPES;
}
+
+ // table type, partitionSort
+ private static Iterable<Object[]> configParams() {
+ List<Object[]> opts = new ArrayList<>();
+ for (Object tableType : TABLE_TYPES) {
+ opts.add(new Object[] {tableType, "true"});
+ opts.add(new Object[] {tableType, "false"});
+ }
+ return opts;
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 0d9b8fa9a3a..27093f2dd44 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1077,56 +1077,59 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test Bulk Insert Into Bucket Index Table") {
withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
- withTempDir { tmp =>
- val tableName = generateTableName
- // Create a partitioned table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | dt string,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (
- | primaryKey = 'id,name',
- | preCombineField = 'ts',
- | hoodie.index.type = 'BUCKET',
- | hoodie.bucket.index.hash.field = 'id,name')
- | partitioned by (dt)
- | location '${tmp.getCanonicalPath}'
- """.stripMargin)
+ Seq("mor", "cow").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = '$tableType',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.bucket.index.hash.field = 'id,name')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
- // Note: Do not write the field alias, the partition field must be
placed last.
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1,1', 10, 1000, "2021-01-05"),
- | (2, 'a2', 20, 2000, "2021-01-06"),
- | (3, 'a3,3', 30, 3000, "2021-01-07")
- """.stripMargin)
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1,1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3,3', 30, 3000, "2021-01-07")
+ """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+ )
- spark.sql(
- s"""
- | insert into $tableName values
- | (1, 'a1', 10, 1000, "2021-01-05"),
- | (3, "a3", 30, 3000, "2021-01-07")
- """.stripMargin)
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (3, "a3", 30, 3000, "2021-01-07")
+ """.stripMargin)
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
- Seq(1, "a1", 10.0, 1000, "2021-01-05"),
- Seq(2, "a2", 20.0, 2000, "2021-01-06"),
- Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
- Seq(3, "a3", 30.0, 3000, "2021-01-07")
- )
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
}
}
}