xushiyan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1253031610
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ }
+
+ // if no record key, no preCombine and no explicit partition path is set,
we should treat it as append only workload
Review Comment:
not sure about this rule: shouldn't append-only just depends on having
`preCombine` or not?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ }
Review Comment:
should early return `operation` here as user has set it and no deducing
should happen
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
Review Comment:
do you know what it is to be cleaned up here?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -158,20 +158,7 @@ object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
- var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
- // TODO clean up
- // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
- // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
- // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
- if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
- operation == WriteOperationType.UPSERT) {
-
- log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
- s"when $INSERT_DROP_DUPS is set to be true, " +
- s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
-
- operation = WriteOperationType.INSERT
- }
+ val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
Review Comment:
can this be extracted to some common module? better to align semantics for
all sql writes
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ }
+
+ // if no record key, no preCombine and no explicit partition path is set,
we should treat it as append only workload
+ // and make bulk_insert as operation type.
+ if
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+ &&
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PARTITIONPATH_FIELD.key())
+ &&
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key())
+ && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ log.warn(s"Choosing BULK_INSERT as the operation type since auto record
key generation is applicable")
+ operation = WriteOperationType.BULK_INSERT
+ }
+ // if no record key is set, will switch the default operation to INSERT
(auto record key gen)
+ else if
(!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+ && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ log.warn(s"Choosing INSERT as the operation type since auto record key
generation is applicable")
+ operation = WriteOperationType.INSERT
Review Comment:
so here both of the cases are for auto keygen: partition and precombine not
set => bulkinsert, either partition or precombine is set => insert
not really sure about the rationale behind; bulkinsert vs insert is only
about using small file handling or not, which should not be coupled with
whether partition or precombine set by user?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]