pengzhiwei2018 commented on a change in pull request #3328: URL: https://github.com/apache/hudi/pull/3328#discussion_r678031341
########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -159,7 +159,10 @@ object HoodieSparkSqlWriter { // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) - val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || operation.equals(WriteOperationType.UPSERT); + val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || + operation.equals(WriteOperationType.UPSERT) || + parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(), Review comment: If the `COMBINE_BEFORE_INSERT_PROP` has enabled, `SparkInsertCommitActionExecutor` will do the `combineOnCondition` in `AbstractWriteHelper` which will need the `precombine` value . But here we have ignored the case of `COMBINE_BEFORE_INSERT_PROP` which will not extract the `precombine` value to the HoodiePayload. ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala ########## @@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand { .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue) .toBoolean - val operation = if (isOverwrite) { - if (table.partitionColumnNames.nonEmpty) { - INSERT_OVERWRITE_OPERATION_OPT_VAL // overwrite partition - } else { - INSERT_OPERATION_OPT_VAL + val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, + DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean + val isPartitionedTable = table.partitionColumnNames.nonEmpty + val isPrimaryKeyTable = primaryColumns.nonEmpty + val operation = + (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match { + case (true, true, _, _) => + throw new IllegalArgumentException(s"Table with primaryKey can not use bulk insert.") Review comment: Because currently we will do the primary key uniqueness check when inserting data to the pk-table, just like what database does. Bulk insert currently cannot do such things. ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala ########## @@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand { RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","), PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields, PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName, + ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString, + HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString, // if the table has primaryKey, enable the combine Review comment: We need to ensure that data is unique for pk-table just like database, so I do combine for the input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org