[
https://issues.apache.org/jira/browse/HUDI-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388246#comment-17388246
]
ASF GitHub Bot commented on HUDI-2208:
--------------------------------------
nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r677405475
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
Review comment:
may I know why do we have this constraint?
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
- val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT);
+ val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+ operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
Review comment:
sorry I don't get you. Precombine is just one field as is right. Not
sure what do you mean by "not compute the preCombine field value"? can you
throw some more light please.
In general, for inserts we don't do any precombine. But if this config
(COMBINE_BEFORE_INSERT_PROP) is enabled, we need to do preCombine.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand {
RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
+ ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString,
Review comment:
you can add one, but make the default as true.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
Review comment:
Am just trying to understand the sql dml here. We already handle save
modes within HoodieSparkSqlWriter. So, trying to understand whats required in
addition to that? Trying to avoid duplication if possible. I mean, for some of
the cases listed here, its just about overWrite mode.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
- val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT);
+ val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+ operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
Review comment:
also, if its something that required just for sql dml, then we should
probably think about adding it to sql dml classes and not here which spans
spark datasource as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Support Bulk Insert For Spark Sql
> ---------------------------------
>
> Key: HUDI-2208
> URL: https://issues.apache.org/jira/browse/HUDI-2208
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: pengzhiwei
> Assignee: pengzhiwei
> Priority: Major
> Labels: pull-request-available
>
> Support the bulk insert for spark sql
--
This message was sent by Atlassian Jira
(v8.3.4#803005)