[ 
https://issues.apache.org/jira/browse/HUDI-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392553#comment-17392553
 ] 

ASF GitHub Bot commented on HUDI-2208:
--------------------------------------

vinothchandar commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r682108786



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")

Review comment:
       insert overwrite partition should be using the `INSERT_OVERWRITE` 
operation.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
       @pengzhiwei2018 do you agree with siva's analysis above?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
       +1 users might have another hudi table for e.g to CTAS from. So if we 
disallow bulk insert with a pk, then there is no good way to do a full 
bootstrap. Can we relax this?

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+        // insert overwrite table
+        case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+        // if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+        case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+        // if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+        case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+        // for the rest case, use the insert operation
+        case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
       having configs to control sql behavior sounds pretty complex to me. Can 
we just pick the right operation for each type. 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -248,6 +248,14 @@ object DataSourceWriteOptions {
     .withDocumentation("When set to true, will perform write operations 
directly using the spark native " +
       "`Row` representation, avoiding any additional conversion costs.")
 
+  /**
+   * Enable the bulk insert for sql insert statement.
+   */
+  val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty

Review comment:
       CTAS should use bulk_insert and insert into should use regular inserts 
IMO. 
   lets please not add more configs like this proactively. We can get user 
feedback and iterate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> [SQL] Support Bulk Insert For Spark Sql
> ---------------------------------------
>
>                 Key: HUDI-2208
>                 URL: https://issues.apache.org/jira/browse/HUDI-2208
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: pengzhiwei
>            Assignee: pengzhiwei
>            Priority: Blocker
>              Labels: pull-request-available, release-blocker
>
> Support the bulk insert for spark sql



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to