This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c90eb4c253 [HUDI-6144] Consistent Hashing bucket index supports bulk
insert usability improvement (#9137)
6c90eb4c253 is described below
commit 6c90eb4c253f0a1d6f7240e5d978a77c4fcf7a47
Author: StreamingFlames <[email protected]>
AuthorDate: Sat Jul 8 11:38:38 2023 +0800
[HUDI-6144] Consistent Hashing bucket index supports bulk insert usability
improvement (#9137)
Co-authored-by: qijun.fqj <[email protected]>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 8 ++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 59 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b61baab876c..0cabef97913 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -48,6 +48,8 @@ import org.apache.hudi.config.{HoodieInternalConfig,
HoodieWriteConfig}
import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
@@ -372,7 +374,7 @@ object HoodieSparkSqlWriter {
// Short-circuit if bulk_insert via row is enabled.
// scalastyle:off
- if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation ==
WriteOperationType.BULK_INSERT) {
+ if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation ==
WriteOperationType.BULK_INSERT && !isConsistentHashingBucketIndex(client)) {
return bulkInsertAsRow(client, parameters, hoodieConfig, df,
mode, tblName, basePath,
instantTime, writerSchema, tableConfig)
}
@@ -1062,6 +1064,10 @@ object HoodieSparkSqlWriter {
&& client.getConfig.isAsyncClusteringEnabled)
}
+ private def isConsistentHashingBucketIndex(client: SparkRDDWriteClient[_]):
Boolean = {
+ client.getConfig.getIndexType == IndexType.BUCKET &&
client.getConfig.getBucketIndexEngineType ==
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING
+ }
+
/**
* Fetch table config for an already existing table and if save mode is not
Overwrite.
* @param sparkContext instance of {@link SparkContext} to use.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 54e979486b5..950210f6d38 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1499,4 +1499,63 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
assertEquals(3,
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
}
}
+
+ test("Test Bulk Insert Into Consistent Hashing Bucket Index Table") {
+ withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.index.bucket.engine = 'CONSISTENT_HASHING',
+ | hoodie.bucket.index.hash.field = 'id,name'
+ | )
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1,1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3,3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (3, "a3", 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
+ }
+ }
}