This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c90eb4c253 [HUDI-6144] Consistent Hashing bucket index supports bulk 
insert usability improvement (#9137)
6c90eb4c253 is described below

commit 6c90eb4c253f0a1d6f7240e5d978a77c4fcf7a47
Author: StreamingFlames <[email protected]>
AuthorDate: Sat Jul 8 11:38:38 2023 +0800

    [HUDI-6144] Consistent Hashing bucket index supports bulk insert usability 
improvement (#9137)
    
    Co-authored-by: qijun.fqj <[email protected]>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  8 ++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 59 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b61baab876c..0cabef97913 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -48,6 +48,8 @@ import org.apache.hudi.config.{HoodieInternalConfig, 
HoodieWriteConfig}
 import org.apache.hudi.config.HoodieInternalConfig.SQL_MERGE_INTO_WRITES
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
@@ -372,7 +374,7 @@ object HoodieSparkSqlWriter {
 
             // Short-circuit if bulk_insert via row is enabled.
             // scalastyle:off
-            if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == 
WriteOperationType.BULK_INSERT) {
+            if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == 
WriteOperationType.BULK_INSERT && !isConsistentHashingBucketIndex(client)) {
               return bulkInsertAsRow(client, parameters, hoodieConfig, df, 
mode, tblName, basePath,
                 instantTime, writerSchema, tableConfig)
             }
@@ -1062,6 +1064,10 @@ object HoodieSparkSqlWriter {
       && client.getConfig.isAsyncClusteringEnabled)
   }
 
+  private def isConsistentHashingBucketIndex(client: SparkRDDWriteClient[_]): 
Boolean = {
+    client.getConfig.getIndexType == IndexType.BUCKET && 
client.getConfig.getBucketIndexEngineType == 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING
+  }
+
   /**
    * Fetch table config for an already existing table and if save mode is not 
Overwrite.
    * @param sparkContext instance of {@link SparkContext} to use.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 54e979486b5..950210f6d38 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1499,4 +1499,63 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       assertEquals(3, 
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
     }
   }
+
+  test("Test Bulk Insert Into Consistent Hashing Bucket Index Table") {
+    withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | type = 'mor',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.index.bucket.engine = 'CONSISTENT_HASHING',
+             | hoodie.bucket.index.hash.field = 'id,name'
+             | )
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+           """.stripMargin)
+
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1,1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3,3', 30, 3000, "2021-01-07")
+           """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+        )
+
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', 10, 1000, "2021-01-05"),
+             | (3, "a3", 30, 3000, "2021-01-07")
+         """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+      }
+    }
+  }
 }

Reply via email to