xushiyan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1253031610


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up
+    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+      operation == WriteOperationType.UPSERT) {
+
+      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+        s"when $INSERT_DROP_DUPS is set to be true, " +
+        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+      operation = WriteOperationType.INSERT
+    }
+
+    // if no record key, no preCombine and no explicit partition path is set, 
we should treat it as append only workload

Review Comment:
   not sure about this rule: shouldn't append-only just depends on having 
`preCombine` or not?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up
+    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+      operation == WriteOperationType.UPSERT) {
+
+      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+        s"when $INSERT_DROP_DUPS is set to be true, " +
+        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+      operation = WriteOperationType.INSERT
+    }

Review Comment:
   should early return `operation` here as user has set it and no deducing 
should happen



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up

Review Comment:
   do you know what it is to be cleaned up here? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -158,20 +158,7 @@ object HoodieSparkSqlWriter {
       case _ => throw new HoodieException("hoodie only support 
org.apache.spark.serializer.KryoSerializer as spark.serializer")
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
-    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
-    // TODO clean up
-    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
-    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
-    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
-    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
-      operation == WriteOperationType.UPSERT) {
-
-      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
-        s"when $INSERT_DROP_DUPS is set to be true, " +
-        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
-
-      operation = WriteOperationType.INSERT
-    }
+    val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)

Review Comment:
   can this be extracted to some common module? better to align semantics for 
all sql writes



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up
+    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+      operation == WriteOperationType.UPSERT) {
+
+      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+        s"when $INSERT_DROP_DUPS is set to be true, " +
+        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+      operation = WriteOperationType.INSERT
+    }
+
+    // if no record key, no preCombine and no explicit partition path is set, 
we should treat it as append only workload
+    // and make bulk_insert as operation type.
+    if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+      &&  
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PARTITIONPATH_FIELD.key())
+      && 
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key())
+      && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+      log.warn(s"Choosing BULK_INSERT as the operation type since auto record 
key generation is applicable")
+      operation = WriteOperationType.BULK_INSERT
+    }
+    // if no record key is set, will switch the default operation to INSERT 
(auto record key gen)
+    else if 
(!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+      && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+      log.warn(s"Choosing INSERT as the operation type since auto record key 
generation is applicable")
+      operation = WriteOperationType.INSERT

Review Comment:
   so here both of the cases are for auto keygen: partition and precombine not 
set => bulkinsert, either partition or precombine is set => insert
   
   not really sure about the rationale behind; bulkinsert vs insert is only 
about using small file handling or not, which should not be coupled with 
whether partition or precombine set by user?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to