helanto opened a new issue #3709:
URL: https://github.com/apache/hudi/issues/3709


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? Yes
   
   **Describe the problem you faced**
   
   Hi,
   when using INSERT mode and the input dateset includes duplicate keys, a (not 
wanted) de-duplication step is performed as part of the merge process.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. First I create a Hudi table (which contains duplicate keys) using INSERT 
mode.
   ```
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
   import org.apache.hudi.keygen.NonpartitionedKeyGenerator
   import org.apache.hudi.hive.NonPartitionedExtractor
   
   val tableName: String = "language"
   
   val cfg: Map[String, String] = Map[String, String](
       DataSourceWriteOptions.TABLE_TYPE.key() -> 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
       DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "score",
       DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "lang",
       DataSourceWriteOptions.TABLE_NAME.key() -> tableName,
       HoodieWriteConfig.TBL_NAME.key() -> tableName,
       HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> 
classOf[NonpartitionedKeyGenerator].getCanonicalName,
       DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key() -> 
classOf[NonPartitionedExtractor].getCanonicalName,
       DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
       DataSourceWriteOptions.OPERATION.key() -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
       DataSourceWriteOptions.INSERT_DROP_DUPS.key() -> "false",
       HoodieWriteConfig.COMBINE_BEFORE_INSERT.key() -> "false",
       HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> 
"true",
       HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "104857600")
       
   val initialInput = (("python", 1) :: ("scala", 5) :: ("scala", 4) :: 
("haskell", 5) :: Nil).toDF("lang", "score")
   
   initialInput.write.format("hudi").options(cfg).mode(Append).save(basePath)
   ```
   2. The Hudi table contains duplicate data, which is what I expect.
   ```
   spark.read.format("hudi").load(basePath).show()
   
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|   lang|score|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   |     20210923165656|  20210923165656_0_1|            python|                
      |6be75119-47a0-4ad...| python|    1|
   |     20210923165656|  20210923165656_0_2|             scala|                
      |6be75119-47a0-4ad...|  scala|    5|
   |     20210923165656|  20210923165656_0_3|             scala|                
      |6be75119-47a0-4ad...|  scala|    4|
   |     20210923165656|  20210923165656_0_4|           haskell|                
      |6be75119-47a0-4ad...|haskell|    5|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   ```
   3. We insert a second dataset (which again contains duplicate keys).
   ```
   val newInput = (("scala", 5) ::  ("java", 3) :: ("java", 4) :: 
Nil).toDF("lang", "score")
   
   newInput.write.format("hudi").options(cfg).mode(Append).save(basePath)
   ```
   4. Some entries from the second dataset is not stored in the Hudi table:
   ```
   spark.read.format("hudi").load(basePath).show()
   
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|   lang|score|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   |     20210923165656|  20210923165656_0_1|            python|                
      |6be75119-47a0-4ad...| python|    1|
   |     20210923165656|  20210923165656_0_2|             scala|                
      |6be75119-47a0-4ad...|  scala|    5|
   |     20210923165656|  20210923165656_0_3|             scala|                
      |6be75119-47a0-4ad...|  scala|    4|
   |     20210923165656|  20210923165656_0_4|           haskell|                
      |6be75119-47a0-4ad...|haskell|    5|
   |     20210923165821|  20210923165821_0_5|              java|                
      |6be75119-47a0-4ad...|   java|    4|
   |     20210923165821|  20210923165821_0_6|             scala|                
      |6be75119-47a0-4ad...|  scala|    5|
   
+-------------------+--------------------+------------------+----------------------+--------------------+-------+-----+
   ```
   
   **Expected behavior**
   
   I would expect that both entries with key `java` are stored in the table, 
since I have deactivated de-duplication on INSERT.  I am not sure if 
de-duplication on input is desirable, but I would expect the behavior of the 
first insert to be the same as the behavior of the second insert when using the 
same configuration.
   
   I manage to achieve the expected behavior by setting the 
`HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT` to `"0"` 
([HoodieMergeHandle](https://github.com/apache/hudi/blob/6228b17a3ddb4c336b30e5b8c650e003e38b5e3e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java#L243)
 seems to deduplicate input dataset even when precombine is deactivated). 
Having to deactivate merging is something that I want to avoid.
   
   **Environment Description**
   
   * Hudi version : 0.9.0
   
   * Spark version : 3.0.1
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to