xushiyan commented on code in PR #7951:
URL: https://github.com/apache/hudi/pull/7951#discussion_r1120959200
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -288,44 +287,42 @@ object DataSourceWriteOptions {
.withDocumentation("The table type for the underlying data, for this
write. This can’t change between writes.")
/**
- * Translate spark parameters to hudi parameters
- *
- * @param optParams Parameters to be translated
- * @return Parameters after translation
- */
- def translateSqlOptions(optParams: Map[String, String]): Map[String, String]
= {
- var translatedOptParams = optParams
- // translate the api partitionBy of spark DataFrameWriter to
PARTITIONPATH_FIELD
- // we should set hoodie's partition path only if its not set by the user.
- if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
- &&
!optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
- val partitionColumns =
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
- .map(SparkDataSourceUtils.decodePartitioningColumns)
- .getOrElse(Nil)
- val keyGeneratorClass =
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
- DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
-
- keyGeneratorClass match {
- // CustomKeyGenerator needs special treatment, because it needs to be
specified in a way
- // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
- // partitionBy can specify the partition like this: partitionBy("p1",
"p2:SIMPLE", "p3:TIMESTAMP")
- case c if (c.nonEmpty && c == classOf[CustomKeyGenerator].getName) =>
- val partitionPathField = partitionColumns.map(e => {
- if (e.contains(":")) {
- e
- } else {
- s"$e:SIMPLE"
- }
- }).mkString(",")
- translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key ->
partitionPathField)
- case c if (c.isEmpty ||
!keyGeneratorClass.equals(classOf[NonpartitionedKeyGenerator].getName)) =>
- // for any key gen other than NonPartitioned key gen, we can
override the partition field config.
- val partitionPathField = partitionColumns.mkString(",")
- translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key ->
partitionPathField)
- case _ => // no op incase of NonPartitioned Key gen.
- }
+ * Derive [[PARTITIONPATH_FIELD]] based on
[[SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY]]
+ * if [[PARTITIONPATH_FIELD]] is not set explicitly.
+ */
+ def derivePartitionPathFieldsIfNeeded(optParams: Map[String, String]):
Map[String, String] = {
+ if (!optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+ ||
optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+ return optParams
+ }
+
+ val partitionColumns =
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+ .map(SparkDataSourceUtils.decodePartitioningColumns)
+ .getOrElse(Nil)
+ val keyGeneratorClass =
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
Review Comment:
@nsivabalan how do we consider `hoodie.datasource.write.keygenerator.type`
compare to key gen class in terms of precedence? it should fail a validation if
unmatched or we ignore key gen type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]