[
https://issues.apache.org/jira/browse/HUDI-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393699#comment-17393699
]
ASF GitHub Bot commented on HUDI-2208:
--------------------------------------
pengzhiwei2018 commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r683207185
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -248,6 +248,14 @@ object DataSourceWriteOptions {
.withDocumentation("When set to true, will perform write operations
directly using the spark native " +
"`Row` representation, avoiding any additional conversion costs.")
+ /**
+ * Enable the bulk insert for sql insert statement.
+ */
+ val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty
Review comment:
Sound reasonable about this. CTAS use the bulk_insert by default, and
regular insert for insert into by default.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
Review comment:
For CTAS, we can relax this. Because there is no data exist in the
target table. We can just combine the input by pk before bulk insert to reach
the same goal.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
// Convert to RDD[HoodieRecord]
val genericRecords: RDD[GenericRecord] =
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
- val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
operation.equals(WriteOperationType.UPSERT);
+ val shouldCombine =
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+ operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
Review comment:
@vinothchandar well I think INSERT_DROP_DUPS_OPT_KEY is some different
from COMBINE_BEFORE_INSERT_PROP.
**INSERT_DROP_DUPS_OPT_KEY**: is used to drop the duplicate record in the
target table.
`COMBINE_BEFORE_INSERT_PROP`: is used to combine the duplicate record in the
input.
So they are not total the same config. IMO.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, _, true, _) if isPartitionedTable =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
Review comment:
Well, in spark-sql , `insert overwrite partitioned table ` do not has
the meaning of insert overwrite all the table. It is only overwrite the
affected partitions.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
Review comment:
Yes, currently bulk insert has not support INSERT_OVERWRITE operation.
So we cannot convert the` insert overwrite partitioned table ` statement to
bulk insert.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, _, true, _) if isPartitionedTable =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (_, _, true, _) if !isPartitionedTable =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
Review comment:
Yes, but is cannot convert the write operation type for sql's insert
statement. Here we must do the convert and pass the write type to
HoodieSparkSqlWriter.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
Review comment:
Yes, the validate is some duplication here. But to keep the logic
intact, I think we should keep this.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
.getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
.toBoolean
- val operation = if (isOverwrite) {
- if (table.partitionColumnNames.nonEmpty) {
- INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition
- } else {
- INSERT_OPERATION_OPT_VAL
+ val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+ val isPartitionedTable = table.partitionColumnNames.nonEmpty
+ val isPrimaryKeyTable = primaryColumns.nonEmpty
+ val operation =
+ (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+ case (true, true, _, _) =>
+ throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert.")
+ case (_, true, true, _) if isPartitionedTable =>
+ throw new IllegalArgumentException(s"Insert Overwrite Partition can
not use bulk insert.")
+ case (_, true, _, true) =>
+ throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
+ s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+ // if enableBulkInsert is true, use bulk insert for the insert
overwrite non-partitioned table.
+ case (_, true, true, _) if !isPartitionedTable =>
BULK_INSERT_OPERATION_OPT_VAL
+ // insert overwrite partition
+ case (_, _, true, _) if isPartitionedTable =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
+ // insert overwrite table
+ case (_, _, true, _) if !isPartitionedTable =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ // if the table has primaryKey and the dropDuplicate has disable, use
the upsert operation
+ case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
Review comment:
As we said aboveļ¼ we need do the pk uniqueness check. So we need use the
upsert to reach this goal by a custom Playload class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> [SQL] Support Bulk Insert For Spark Sql
> ---------------------------------------
>
> Key: HUDI-2208
> URL: https://issues.apache.org/jira/browse/HUDI-2208
> Project: Apache Hudi
> Issue Type: Sub-task
> Reporter: pengzhiwei
> Assignee: pengzhiwei
> Priority: Blocker
> Labels: pull-request-available, release-blocker
> Fix For: 0.9.0
>
>
> Support the bulk insert for spark sql
--
This message was sent by Atlassian Jira
(v8.3.4#803005)