This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dd0b809c36b [HUDI-8630] Fix convert spark key generator issue (#12402)
dd0b809c36b is described below
commit dd0b809c36ba867cc994def57b5631db1c5111c9
Author: fhan <[email protected]>
AuthorDate: Thu Dec 12 12:19:24 2024 +0800
[HUDI-8630] Fix convert spark key generator issue (#12402)
* [HUDI-8630]fix convert spark key generator issue
* update comments in UT
---------
Co-authored-by: fhan <[email protected]>
---
.../hudi/HoodieDatasetBulkInsertHelper.scala | 6 ++--
.../spark/sql/hudi/dml/TestInsertTable.scala | 40 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 8f01143506b..e5828143cb5 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -30,11 +30,11 @@ import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator,
BuiltinKeyGenerator, KeyGenUtils}
import
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper,
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
-
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
@@ -96,7 +96,7 @@ object HoodieDatasetBulkInsertHelper
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime)
}
val sparkKeyGenerator =
- ReflectionUtils.loadClass(keyGeneratorClassName, typedProps)
+
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
typedProps)
.asInstanceOf[BuiltinKeyGenerator]
val keyGenerator: BuiltinKeyGenerator = if
(autoGenerateRecordKeys) {
new AutoRecordGenWrapperKeyGenerator(typedProps,
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
@@ -243,7 +243,7 @@ object HoodieDatasetBulkInsertHelper
private def getPartitionPathFields(config: HoodieWriteConfig):
mutable.Seq[String] = {
val keyGeneratorClassName =
config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)
- val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+ val keyGenerator =
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
keyGenerator.getPartitionPathFields.asScala
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index b8912d58c5c..3b3252cac31 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2997,4 +2997,44 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}
+
+ test("Test SparkKeyGenerator When Bulk Insert") {
+ withSQLConf("hoodie.sql.bulk.insert.enable" -> "true",
"hoodie.sql.insert.mode" -> "non-strict") {
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a multi-level partitioned table
+ // Specify wrong keygenarator by setting
hoodie.datasource.write.keygenerator.class =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string,
+ | pt string
+ |) using hudi
+ |tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.table.keygenerator.class =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
+ | hoodie.datasource.write.keygenerator.class =
'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
+ |)
+ | partitioned by (dt, pt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+ //Insert data and check the same
+ spark.sql(
+ s"""insert into $tableName values
+ |(1, 'a', 31, 1000, '2021-01-05', 'A'),
+ |(2, 'b', 18, 1000, '2021-01-05', 'A')
+ |""".stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt, pt from $tableName order
by dt")(
+ Seq(1, "a", 31, 1000, "2021-01-05", "A"),
+ Seq(2, "b", 18, 1000, "2021-01-05", "A")
+ )
+ })
+ }
+ }
}