pengzhiwei2018 commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r678031341



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
           // Convert to RDD[HoodieRecord]
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+          val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+            operation.equals(WriteOperationType.UPSERT) ||
+            
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
       If the `COMBINE_BEFORE_INSERT_PROP` has enabled, 
`SparkInsertCommitActionExecutor` will do the  `combineOnCondition` in 
`AbstractWriteHelper`  which will need the `precombine` value .
    But here we have ignored the case of `COMBINE_BEFORE_INSERT_PROP` which 
will not extract the `precombine` value to the HoodiePayload.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
       .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
       .toBoolean
 
-    val operation = if (isOverwrite) {
-      if (table.partitionColumnNames.nonEmpty) {
-        INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-      } else {
-        INSERT_OPERATION_OPT_VAL
+    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+    val isPartitionedTable = table.partitionColumnNames.nonEmpty
+    val isPrimaryKeyTable = primaryColumns.nonEmpty
+    val operation =
+      (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+        case (true, true, _, _) =>
+          throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
       Because currently we will do the primary key uniqueness check when 
inserting data to the pk-table, just like what database does. Bulk insert 
currently cannot do such things.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand {
         RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
         PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
         PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
+        ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString,
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> 
isPrimaryKeyTable.toString, // if the table has primaryKey, enable the combine

Review comment:
       We need to ensure that data is unique for pk-table just like database, 
so I do combine for the input.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to