This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.11.1-rc2-prep in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e68e0ca374a0626502a5e36389921fe3605cf97b Author: Danny Chan <[email protected]> AuthorDate: Thu Jun 9 20:37:58 2022 +0800 [HUDI-4213] Infer keygen clazz for Spark SQL (#5815) --- .../scala/org/apache/hudi/DataSourceOptions.scala | 34 +++++++++++++--------- .../spark/sql/hudi/command/SqlKeyGenerator.scala | 9 +++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index bcc8ce3db0..a6c957b4c0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.HoodieConversionUtils.toScalaOption -import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig} +import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig @@ -323,22 +323,12 @@ object DataSourceWriteOptions { val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE /** - * Key generator class, that implements will extract the key out of incoming record - * + * Key generator class, that implements will extract the key out of incoming record. */ val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { - if (!p.contains(PARTITIONPATH_FIELD)) { - Option.of(classOf[NonpartitionedKeyGenerator].getName) - } else { - val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length - val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length - if (numOfPartFields == 1 && numOfRecordKeyFields == 1) { - Option.of(classOf[SimpleKeyGenerator].getName) - } else { - Option.of(classOf[ComplexKeyGenerator].getName) - } - } + Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps)) }) + val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .defaultValue(classOf[SimpleKeyGenerator].getName) @@ -804,6 +794,22 @@ object DataSourceOptionsHelper { ) ++ translateConfigurations(parameters) } + def inferKeyGenClazz(props: TypedProperties): String = { + val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null) + if (partitionFields != null) { + val numPartFields = partitionFields.split(",").length + val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) + val numRecordKeyFields = recordsKeyFields.split(",").length + if (numPartFields == 1 && numRecordKeyFields == 1) { + classOf[SimpleKeyGenerator].getName + } else { + classOf[ComplexKeyGenerator].getName + } + } else { + classOf[NonpartitionedKeyGenerator].getName + } + } + implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = { new JavaFunction[From, To] { override def apply (input: From): To = function (input) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala index 9d139389fd..798ed84b09 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.generic.GenericRecord +import org.apache.hudi.DataSourceOptionsHelper import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig @@ -113,14 +114,14 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props) } else partitionPath } - override def getPartitionPath(record: GenericRecord) = { + override def getPartitionPath(record: GenericRecord): String = { val partitionPath = super.getPartitionPath(record) - convertPartitionPathToSqlType(partitionPath, false) + convertPartitionPathToSqlType(partitionPath, rowType = false) } override def getPartitionPath(row: Row): String = { val partitionPath = super.getPartitionPath(row) - convertPartitionPathToSqlType(partitionPath, true) + convertPartitionPathToSqlType(partitionPath, rowType = true) } } @@ -135,7 +136,7 @@ object SqlKeyGenerator { if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) { HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName) } else { - classOf[ComplexKeyGenerator].getCanonicalName + DataSourceOptionsHelper.inferKeyGenClazz(props) } } }
