xushiyan commented on code in PR #7951:
URL: https://github.com/apache/hudi/pull/7951#discussion_r1120954001


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -288,44 +287,42 @@ object DataSourceWriteOptions {
     .withDocumentation("The table type for the underlying data, for this 
write. This can’t change between writes.")
 
   /**
-    * Translate spark parameters to hudi parameters
-    *
-    * @param optParams Parameters to be translated
-    * @return Parameters after translation
-    */
-  def translateSqlOptions(optParams: Map[String, String]): Map[String, String] 
= {
-    var translatedOptParams = optParams
-    // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD
-    // we should set hoodie's partition path only if its not set by the user.
-    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
-      && 
!optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
-      val partitionColumns = 
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
-        .map(SparkDataSourceUtils.decodePartitioningColumns)
-        .getOrElse(Nil)
-      val keyGeneratorClass = 
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
-        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
-
-      keyGeneratorClass match {
-        // CustomKeyGenerator needs special treatment, because it needs to be 
specified in a way
-        // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
-        // partitionBy can specify the partition like this: partitionBy("p1", 
"p2:SIMPLE", "p3:TIMESTAMP")
-        case c if (c.nonEmpty && c == classOf[CustomKeyGenerator].getName) =>
-          val partitionPathField = partitionColumns.map(e => {
-            if (e.contains(":")) {
-              e
-            } else {
-              s"$e:SIMPLE"
-            }
-          }).mkString(",")
-          translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> 
partitionPathField)
-        case c if (c.isEmpty || 
!keyGeneratorClass.equals(classOf[NonpartitionedKeyGenerator].getName)) =>
-          // for any key gen other than NonPartitioned key gen, we can 
override the partition field config.
-          val partitionPathField = partitionColumns.mkString(",")
-          translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> 
partitionPathField)
-        case _ => // no op incase of NonPartitioned Key gen.
-      }
+   * Derive [[PARTITIONPATH_FIELD]] based on 
[[SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY]]
+   * if [[PARTITIONPATH_FIELD]] is not set explicitly.
+   */
+  def derivePartitionPathFieldsIfNeeded(optParams: Map[String, String]): 
Map[String, String] = {
+    if (!optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+      || 
optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
+      return optParams
+    }
+
+    val partitionColumns = 
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+      .map(SparkDataSourceUtils.decodePartitioningColumns)
+      .getOrElse(Nil)
+    val keyGeneratorClass = 
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
+
+    keyGeneratorClass match {
+      // CustomKeyGenerator needs special treatment, because it needs to be 
specified in a way
+      // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
+      // partitionBy can specify the partition like this: partitionBy("p1", 
"p2:SIMPLE", "p3:TIMESTAMP")
+      case c if Array(classOf[CustomKeyGenerator].getName, 
classOf[CustomAvroKeyGenerator].getName).contains(c) =>
+        val partitionPathField = partitionColumns.map(e => {
+          if (e.contains(":")) {
+            e
+          } else {
+            s"$e:SIMPLE"
+          }
+        }).mkString(",")
+        optParams ++ Map(PARTITIONPATH_FIELD.key -> partitionPathField)
+      case c if !Array(classOf[NonpartitionedKeyGenerator].getName, 
classOf[NonpartitionedAvroKeyGenerator].getName).contains(c) =>

Review Comment:
   not the ideal way of checking key gen type; key gen class itself should tell 
us if it's of type non-partitioned or not, same for custom keygen classes. The 
key gen class factory can provide a util to check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to