xushiyan commented on code in PR #7951:
URL: https://github.com/apache/hudi/pull/7951#discussion_r1118224236


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -49,6 +49,7 @@ import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import 
org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability
 import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, 
SerDeHelper}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions

Review Comment:
   unused import



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -295,30 +295,35 @@ object DataSourceWriteOptions {
   def translateSqlOptions(optParams: Map[String, String]): Map[String, String] 
= {
     var translatedOptParams = optParams
     // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD
-    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)) {
+    // we should set hoodie's partition path only if its not set by the user.
+    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)

Review Comment:
   maybe we should fail the write if these 2 options are not the same? at least 
we should avoid unintended writes



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -295,30 +295,35 @@ object DataSourceWriteOptions {
   def translateSqlOptions(optParams: Map[String, String]): Map[String, String] 
= {
     var translatedOptParams = optParams
     // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD
-    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)) {
+    // we should set hoodie's partition path only if its not set by the user.
+    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)

Review Comment:
   let's track this behavior change for next release. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -295,30 +295,35 @@ object DataSourceWriteOptions {
   def translateSqlOptions(optParams: Map[String, String]): Map[String, String] 
= {
     var translatedOptParams = optParams
     // translate the api partitionBy of spark DataFrameWriter to 
PARTITIONPATH_FIELD
-    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)) {
+    // we should set hoodie's partition path only if its not set by the user.
+    if (optParams.contains(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
+      && 
!optParams.contains(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())) {
       val partitionColumns = 
optParams.get(SparkDataSourceUtils.PARTITIONING_COLUMNS_KEY)
         .map(SparkDataSourceUtils.decodePartitioningColumns)
         .getOrElse(Nil)
       val keyGeneratorClass = 
optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(),
         DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue)
 
-      val partitionPathField =
+      // if not nonpartitioned key gen
+      if (keyGeneratorClass.isEmpty || 
!keyGeneratorClass.equals(classOf[NonpartitionedKeyGenerator].getName)) {
         keyGeneratorClass match {
           // Only CustomKeyGenerator needs special treatment, because it needs 
to be specified in a way
           // such as "field1:PartitionKeyType1,field2:PartitionKeyType2".
           // partitionBy can specify the partition like this: 
partitionBy("p1", "p2:SIMPLE", "p3:TIMESTAMP")
           case c if c == classOf[CustomKeyGenerator].getName =>
-            partitionColumns.map(e => {
+            val partitionPathField = partitionColumns.map(e => {
               if (e.contains(":")) {
                 e
               } else {
                 s"$e:SIMPLE"
               }
             }).mkString(",")
+            translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> 
partitionPathField)
           case _ =>
-            partitionColumns.mkString(",")
+            val partitionPathField = partitionColumns.mkString(",")
+            translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> 
partitionPathField)
         }
-      translatedOptParams = optParams ++ Map(PARTITIONPATH_FIELD.key -> 
partitionPathField)

Review Comment:
   in case of keygen not set or NonpartitionedKeyGenerator, partition path 
field should be empty string. So it's fine right?  what does this change fix? 
besides, we should do config validation instead of fixing the logic here, for 
e.g., when keygen is non partitioned, partition field should not be non-empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to