danny0405 commented on code in PR #8219:
URL: https://github.com/apache/hudi/pull/8219#discussion_r1145693723


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -150,20 +150,7 @@ object HoodieSparkSqlWriter {
       case _ => throw new HoodieException("hoodie only support 
org.apache.spark.serializer.KryoSerializer as spark.serializer")
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
-    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
-    // TODO clean up
-    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
-    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
-    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
-    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&

Review Comment:
   Can we write some UT to check this change?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -141,28 +141,30 @@ trait ProvidesHoodieConfig extends Logging {
     // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
     //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
     val operationOverride = 
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
-    val operation = operationOverride.getOrElse {
-      (enableBulkInsert, isOverwritePartition, isOverwriteTable, 
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
-        case (true, _, _, _, false, _) =>
-          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert in ${insertMode.value()} mode.")
-        case (true, true, _, _, _, true) =>
-          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
-        case (true, _, _, true, _, _) =>
-          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
-            s" Please disable $INSERT_DROP_DUPS and try again.")
-        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
-        case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
-        // insert overwrite table
-        case (false, false, true, _, _, _) => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
-        // insert overwrite partition
-        case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
-        // disable dropDuplicate, and provide preCombineKey, use the upsert 
operation for strict and upsert mode.
-        case (false, false, false, false, false, _) if hasPrecombineColumn => 
UPSERT_OPERATION_OPT_VAL
-        // if table is pk table and has enableBulkInsert use bulk insert for 
non-strict mode.
-        case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
-        // for the rest case, use the insert operation
-        case _ => INSERT_OPERATION_OPT_VAL
-      }
+    val operation = (operationOverride, enableBulkInsert, isOverwritePartition,
+                     isOverwriteTable, dropDuplicate, isNonStrictMode, 
isPartitionedTable, hasPrecombineColumn) match {
+      case (Some(UPSERT_OPERATION_OPT_VAL), _, _, _, _, _, _, false) =>

Review Comment:
   Can we write some UT to check this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to