[
https://issues.apache.org/jira/browse/HUDI-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394384#comment-17394384
]
ASF GitHub Bot commented on HUDI-2208:
--------------------------------------
nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r683843426
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, _, true, _) if isPartitionedTable =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (_, _, true, _) if !isPartitionedTable =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ // if the table has primaryKey and the dropDuplicate has disable, use
the upsert operation
+ case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+ // if enableBulkInsert is true and the table is non-primaryKeyed, use
the bulk insert operation
+ case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // for the rest case, use the insert operation
+ case (_, _, _, _) => INSERT_OPERATION_OPT_VAL
Review comment:
I did go through every case here and have 2 suggestions. rest of the
cases looks good. You don't need to consider my proposal above. But would like
you to consider below feedback.
1.
```
case (true, true, _, _) if !isNonStrictMode => throw new
IllegalArgumentException(s"Table with primaryKey can not use bulk insert in
strict mode.")
```
Can we enable preCombine here and proceed with Bulk_Insert operation. Within
hudi, we can do preCombine/dedup. As we agreed on using bulk_insert as default
with CTAS, this will be a very common use-case.
2.
```
case (_, true, true, _) if isPartitionedTable =>
throw new IllegalArgumentException(s"Insert Overwrite Partition
can not use bulk insert.")
```
since we agreed on enabling Bulk_insert as default for CTAS, this will be
very common use-case as well. Can you help me understand why do we fail this
call? why can't we let it proceed. This is basically, CTAS for a partitioned
table.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
- val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT);
+ val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+ operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
Review comment:
my bad. I get it now.
If InsertDropDups is set, we automatically set combine.before.insert. but if
a user has set just "combine.before.insert", we need to do PreCombine here.
But I am not sure why this wasn't reported by anyone until now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> [SQL] Support Bulk Insert For Spark Sql
> ---------------------------------------
>
> Key: HUDI-2208
> URL: https://issues.apache.org/jira/browse/HUDI-2208
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: pengzhiwei
> Assignee: pengzhiwei
> Priority: Blocker
> Labels: pull-request-available, release-blocker
> Fix For: 0.9.0
>
>
> Support the bulk insert for spark sql
--
This message was sent by Atlassian Jira
(v8.3.4#803005)