nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r679153080
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
Review comment:
also, if there are any valid optimizations, then probably we should move
it to HoodiesparkSqlSwriter so that both spark datasource and sql dml benefits
:)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]