nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r677405475



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
       may I know why do we have this constraint? 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
       sorry I don't get you. Precombine is just one field as is right. Not 
sure what do you mean by "not compute the preCombine field value"? can you 
throw some more light please. 
   In general, for inserts we don't do any precombine. But if this config 
(COMBINE_BEFORE_INSERT_PROP) is enabled, we need to do preCombine. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand {
         RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
         PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
         PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
+        ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString,

Review comment:
       you can add one, but make the default as true. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+        case (_, true, true, _) if isPartitionedTable =>
+          throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+        case (_, true, _, true) =>
+          throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+            s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+        // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+        case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL

Review comment:
       Am just trying to understand the sql dml here. We already handle save 
modes within HoodieSparkSqlWriter. So, trying to understand whats required in 
addition to that? Trying to avoid duplication if possible. I mean, for some of 
the cases listed here, its just about overWrite mode. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
       also, if its something that required just for sql dml, then we should 
probably think about adding it to sql dml classes and not here which spans 
spark datasource as well. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to