[ 
https://issues.apache.org/jira/browse/HUDI-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17390783#comment-17390783
 ] 

ASF GitHub Bot commented on HUDI-2208:
--------------------------------------

nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r680186620



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -248,6 +248,14 @@ object DataSourceWriteOptions {
     .withDocumentation("When set to true, will perform write operations 
directly using the spark native " +
       "`Row` representation, avoiding any additional conversion costs.")
 
+  /**
+   * Enable the bulk insert for sql insert statement.
+   */
+  val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty

Review comment:
       @vinothchandar : In sql, we don't have two separate commands like INSERT 
into and BULK_INSERT into. so, guess we are going this route. But default CTAS 
choose INSERT operation. I am thinking users may not use bulk_insert only since 
they have to set the property explicitly. any thoughts. 
   There are two things to discuss. 
   1. Which operation to use with CTAS
   2. which operation to use with INSERT into. 
   State as of now, is "Insert". And user has to explicitly set operation type 
to bulk_insert before calling any of this commands. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+        // insert overwrite table
+        case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL

Review comment:
       HoodieSparkSqlWriter will handle this save mode. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+        // insert overwrite table
+        case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+        // if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+        case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+        // if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+        case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+        // for the rest case, use the insert operation
+        case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
       Here is my thought on choosing the right operation. Having too many case 
statements might complicate things and is error prone too. As I mentioned 
earlier, we should try to do any valid conversions in HoodiesSparkSqlWriter. 
Only those thats applicable just to sql dml, we should keep it here. 
   Anyways, here is one simplified approach. Ignoring the primary, non primary 
key table for now. We can come back to that later once we have consensus on 
this. 
   
   We need just two configs. 
   hoodie.sql.enable.bulk_insert (default false)
   hoodie.sql.overwrite.entire.table (default true)
   
   From sql syntax, there are two commands allowed. 
   "INSERT" into and "INSERT OVERWRITE".
   
   "INSERT" with no other configs set -> insert operation
   "INSERT" with enable bulk insert set -> bulk_insert
   "INSERT OVERWRITE" with no other configs set -> insert_overwrite_table 
operation
   "INSERT OVERWRITE" with hoodie.sql.overwrite.entire.table = false -> 
insert_overwrite operation.
   "INSERT OVERWRITE" with enable bulk_insert set -> bulk_insert. pass the 
right save mode to HoodieSparkSqlWriter
   "INSERT OVERWRITE" with enable bulk_insert set and 
hoodie.sql.overwrite.entire.table = false -> bulk_insert. pass the right save 
mode to HoodieSparkSqlWriter.
   
   
   

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")

Review comment:
       lets say a user is doing something like this. 
     - create hudi table w/ primary key and partition col set.
     - does bulk_insert with overwrite. 
   
   Do we fail this command? This is very common use-case. Not sure how we can 
fail this. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+        // insert overwrite table
+        case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+        // if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+        case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL

Review comment:
       again trying to understand why we do this. If someone is explicitly 
issuing "INSERT" into, we should try to use "insert" operation. Why switching 
to "upsert" just for primary keyed table ? can you please clarify

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +

Review comment:
       This is already taken care in HoodieSparkSqlWriter. Don't think we need 
to validate this here.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand {
         RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
         PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
         PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
+        ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString,
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> 
isPrimaryKeyTable.toString, // if the table has primaryKey, enable the combine

Review comment:
       yeah, I am also not sure on this. We will do a preCombine step for any 
INSERT in general. wondering why not let users take up the responsibility. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+        // insert overwrite partition
+        case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL

Review comment:
       not sure if we can do this. 
   If user has set "overwrite" for a partitioned table, here we are overwriting 
only the interested partition and not the entire table. Is that a common sql 
expectation? What incase a user has a partitioned table and wants to overwrite 
an entire table? 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
       anyways, we can all it out that its responsibility of the user to ensure 
there are uniqueness. Also, IIUC, hudi can handle duplicates. Incase of 
updates, both records will be updated. but bulk_insert is very performant 
compared to regular Insert especially w/ row wirter. So, we should not keep it 
too restrictive for use. I know from the community msgs, that lot of users 
leverage bulk_insert. I would vote to relax this constraint. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
       I digged in to understand this. Actually INSERT_DROP_DUPS_OPT_KEY is 
used at the Dataource layer and COMBINE_BEFORE_INSERT_PROP is used in 
writeClient layer.  In other words, both are referring to same config only. 
[DataSourceUtils](https://github.com/apache/hudi/blob/6353fc865f43854e0e185af33e8ad091c8870d78/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java#L178)
 converts INSERT_DROP_DUPS_OPT_KEY's value as COMBINE_BEFORE_INSERT_PROP. 
   So, don't think we need to check both INSERT_DROP_DUPS_OPT_KEY and 
COMBINE_BEFORE_INSERT_PROP. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
       hmmm, interesting. I didn't know we do uniqueness check for inserts with 
primary keyed table. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL

Review comment:
       ```
   case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
   ```
   Already the operation is bulk_insert right(2nd arg). Not sure why do we need 
this case? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> [SQL] Support Bulk Insert For Spark Sql
> ---------------------------------------
>
>                 Key: HUDI-2208
>                 URL: https://issues.apache.org/jira/browse/HUDI-2208
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: pengzhiwei
>            Assignee: pengzhiwei
>            Priority: Blocker
>              Labels: pull-request-available, release-blocker
>
> Support the bulk insert for spark sql



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to