nsivabalan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1257563336
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -142,24 +144,27 @@ trait ProvidesHoodieConfig extends Logging {
// we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
val operationOverride =
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
val operation = operationOverride.getOrElse {
- (enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
- case (true, _, _, _, false, _) =>
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable,
+ autoGenerateRecordKeys) match {
+ case (true, _, _, _, false, _, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert in ${insertMode.value()} mode.")
- case (true, true, _, _, _, true) =>
+ case (true, true, _, _, _, true, _) =>
throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
- case (true, _, _, true, _, _) =>
+ case (true, _, _, true, _, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
- case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, false, true, _, _, false, _) =>
BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
- case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ case (false, false, true, _, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
- case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+ case (_, true, false, _, _, true, _) =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
- case (false, false, false, false, false, _) if hasPrecombineColumn =>
UPSERT_OPERATION_OPT_VAL
+ case (false, false, false, false, false, _, _) if hasPrecombineColumn
=> UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
- case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, _, _, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // if auto record key generation is enabled, use bulk_insert
+ case (_, _, _, _, _, true, true) => BULK_INSERT_OPERATION_OPT_VAL
Review Comment:
fixed it.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
Review Comment:
nope. I just copied the comments over. not really sure
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ }
+
+ // if no record key, no preCombine and no explicit partition path is set,
we should treat it as append only workload
Review Comment:
we are being conservative here. Major motivation here is to target users
coming from parquet table. They would be writing to parquet table as
df.write.format("parquet").save(basePath).
when they want to use hudi, table name is mandatory, but everything else if
optional.
So, all they need to do is,
df.write.option("table_name_config","tbl_name").format("hudi").save(basePath).
If user is setting record keys explicitly, I am assuming they knew about the
purpose of primary keys and so might override the operation explicitly on which
case we don't need to deduce anything. Happy to jam more on this.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -158,20 +158,7 @@ object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
- var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
- // TODO clean up
- // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
- // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
- // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
- if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
- operation == WriteOperationType.UPSERT) {
-
- log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
- s"when $INSERT_DROP_DUPS is set to be true, " +
- s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
-
- operation = WriteOperationType.INSERT
- }
+ val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
Review Comment:
we have diff paths for now. spark-sql writes have "sql.write.operation",
while spark-ds writes deduces the operation based on
DataSourceWriteOptions.OPERATION.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ }
+
+ // if no record key, no preCombine and no explicit partition path is set,
we should treat it as append only workload
+ // and make bulk_insert as operation type.
+ if
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+ &&
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PARTITIONPATH_FIELD.key())
+ &&
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key())
+ && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ log.warn(s"Choosing BULK_INSERT as the operation type since auto record
key generation is applicable")
+ operation = WriteOperationType.BULK_INSERT
+ }
+ // if no record key is set, will switch the default operation to INSERT
(auto record key gen)
+ else if
(!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+ && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ log.warn(s"Choosing INSERT as the operation type since auto record key
generation is applicable")
+ operation = WriteOperationType.INSERT
Review Comment:
as I mentioned in the other comment, we are mostly trying to give better
perf for users coming from parquet world. If a user is explicitly setting
record key configs, then we can assume they about hudi's diff operation and if
their use-case is immutable, they would explicitly set bulk_insert.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]