danny0405 commented on code in PR #8219:
URL: https://github.com/apache/hudi/pull/8219#discussion_r1145693723
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -150,20 +150,7 @@ object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
- var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
- // TODO clean up
- // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
- // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
- // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
- if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
Review Comment:
Can we write some UT to check this change?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -141,28 +141,30 @@ trait ProvidesHoodieConfig extends Logging {
// NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
// we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
val operationOverride =
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
- val operation = operationOverride.getOrElse {
- (enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
- case (true, _, _, _, false, _) =>
- throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert in ${insertMode.value()} mode.")
- case (true, true, _, _, _, true) =>
- throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
- case (true, _, _, true, _, _) =>
- throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
- s" Please disable $INSERT_DROP_DUPS and try again.")
- // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
- case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
- // insert overwrite table
- case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
- // insert overwrite partition
- case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
- // disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
- case (false, false, false, false, false, _) if hasPrecombineColumn =>
UPSERT_OPERATION_OPT_VAL
- // if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
- case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
- // for the rest case, use the insert operation
- case _ => INSERT_OPERATION_OPT_VAL
- }
+ val operation = (operationOverride, enableBulkInsert, isOverwritePartition,
+ isOverwriteTable, dropDuplicate, isNonStrictMode,
isPartitionedTable, hasPrecombineColumn) match {
+ case (Some(UPSERT_OPERATION_OPT_VAL), _, _, _, _, _, _, false) =>
Review Comment:
Can we write some UT to check this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]