nsivabalan commented on code in PR #9123:
URL: https://github.com/apache/hudi/pull/9123#discussion_r1257549916
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -187,29 +218,38 @@ trait ProvidesHoodieConfig extends Logging {
val insertModeOpt = combinedOpts.get(SQL_INSERT_MODE.key)
val insertModeSet = insertModeOpt.nonEmpty
+ val sqlWriteOperationOpt = combinedOpts.get(SQL_WRITE_OPERATION.key())
+ val sqlWriteOperationSet = sqlWriteOperationOpt.nonEmpty
+ val sqlWriteOperation =
sqlWriteOperationOpt.getOrElse(SQL_WRITE_OPERATION.defaultValue())
+ val insertDupPolicy = combinedOpts.getOrElse(INSERT_DUP_POLICY.key(),
INSERT_DUP_POLICY.defaultValue())
val insertMode =
InsertMode.of(insertModeOpt.getOrElse(SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val combineBeforeInsert = hoodieCatalogTable.preCombineKey.nonEmpty &&
hoodieCatalogTable.primaryKeys.nonEmpty
- // NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
- // we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
+ // try to use sql write operation instead of legacy insert mode. If only
insert mode is explicitly specified, we will uze
+ // o
+ val useLegacyInsertModeFlow = insertModeSet && !sqlWriteOperationSet
val operation = combinedOpts.getOrElse(OPERATION.key,
+ if (useLegacyInsertModeFlow) {
+ // NOTE: Target operation could be overridden by the user, therefore
if it has been provided as an input
+ // we'd prefer that value over auto-deduced operation.
Otherwise, we deduce target operation type
deduceWriteOperationForInsertInfo(isPartitionedTable,
isOverwritePartition, isOverwriteTable, insertModeSet, dropDuplicate,
- enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert))
+ enableBulkInsert, isInsertInto, isNonStrictMode, combineBeforeInsert)
+ } else {
+ deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable,
sqlWriteOperation)
+ }
+ )
- val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
- tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
- // Validate duplicate key for COW, for MOR it will do the merge with the
DefaultHoodieRecordPayload
- // on reading.
- // TODO use HoodieSparkValidateDuplicateKeyRecordMerger when
SparkRecordMerger is default
- classOf[ValidateDuplicateKeyPayload].getCanonicalName
- } else if (operation == INSERT_OPERATION_OPT_VAL && tableType ==
COW_TABLE_TYPE_OPT_VAL &&
- insertMode == InsertMode.STRICT){
- // Validate duplicate key for inserts to COW table when using strict
insert mode.
- classOf[ValidateDuplicateKeyPayload].getCanonicalName
+ val payloadClassName = if (useLegacyInsertModeFlow) {
+ deducePayloadClassNameLegacy(operation, tableType, insertMode)
} else {
- classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+ // should we also consider old way of doing things.
Review Comment:
this is already taken care in deducePayloadClassNameLegacy, none of the
downstream methods do anything differently. Its only used to deduce the payload
class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]