helanto opened a new issue #3709: URL: https://github.com/apache/hudi/issues/3709
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? Yes **Describe the problem you faced** Hi, when using INSERT mode and the input dateset includes duplicate keys, a (not wanted) de-duplication step is performed as part of the merge process. **To Reproduce** Steps to reproduce the behavior: 1. First I create a Hudi table (which contains duplicate keys) using INSERT mode. ``` import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.hive.NonPartitionedExtractor val tableName: String = "language" val cfg: Map[String, String] = Map[String, String]( DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "score", DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "lang", DataSourceWriteOptions.TABLE_NAME.key() -> tableName, HoodieWriteConfig.TBL_NAME.key() -> tableName, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> classOf[NonpartitionedKeyGenerator].getCanonicalName, DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[NonPartitionedExtractor].getCanonicalName, DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true", DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.INSERT_DROP_DUPS.key() -> "false", HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "false", HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "104857600") val initialInput = (("python", 1) :: ("scala", 5) :: ("scala", 4) :: ("haskell", 5) :: Nil).toDF("lang", "score") initialInput.write.format("hudi").options(cfg).mode(Append).save(basePath) ``` 2. The Hudi table contains duplicate data, which is what I expect. ``` spark.read.format("hudi").load(basePath).show() +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| lang|score| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ | 20210923165656| 20210923165656_0_1| python| |6be75119-47a0-4ad...| python| 1| | 20210923165656| 20210923165656_0_2| scala| |6be75119-47a0-4ad...| scala| 5| | 20210923165656| 20210923165656_0_3| scala| |6be75119-47a0-4ad...| scala| 4| | 20210923165656| 20210923165656_0_4| haskell| |6be75119-47a0-4ad...|haskell| 5| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ ``` 3. We insert a second dataset (which again contains duplicate keys). ``` val newInput = (("scala", 5) :: ("java", 3) :: ("java", 4) :: Nil).toDF("lang", "score") newInput.write.format("hudi").options(cfg).mode(Append).save(basePath) ``` 4. Some entries from the second dataset is not stored in the Hudi table: ``` spark.read.format("hudi").load(basePath).show() +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| lang|score| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ | 20210923165656| 20210923165656_0_1| python| |6be75119-47a0-4ad...| python| 1| | 20210923165656| 20210923165656_0_2| scala| |6be75119-47a0-4ad...| scala| 5| | 20210923165656| 20210923165656_0_3| scala| |6be75119-47a0-4ad...| scala| 4| | 20210923165656| 20210923165656_0_4| haskell| |6be75119-47a0-4ad...|haskell| 5| | 20210923165821| 20210923165821_0_5| java| |6be75119-47a0-4ad...| java| 4| | 20210923165821| 20210923165821_0_6| scala| |6be75119-47a0-4ad...| scala| 5| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+ ``` **Expected behavior** I would expect that both entries with key `java` are stored in the table, since I have deactivated de-duplication on INSERT. I am not sure if de-duplication on input is desirable, but I would expect the behavior of the first insert to be the same as the behavior of the second insert when using the same configuration. I manage to achieve the expected behavior by setting the `HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT` to `"0"` ([HoodieMergeHandle](https://github.com/apache/hudi/blob/6228b17a3ddb4c336b30e5b8c650e003e38b5e3e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L243) seems to deduplicate input dataset even when precombine is deactivated). Having to deactivate merging is something that I want to avoid. **Environment Description** * Hudi version : 0.9.0 * Spark version : 3.0.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
