whight opened a new issue, #8451:
URL: https://github.com/apache/hudi/issues/8451

   **Describe the problem you faced**
   
   I used Spark structured streaming import Kafka data to Hudi table, Kafka 
message contain many same id records. The write operation is INSERT means that 
pre combined will be not work, but I found many rows in the table are upserted, 
only little rows of duplicate key are kept in table, why?
   
   **Expected behavior**
   Every row of duplicate key  will stored in table
   
   **Environment Description**
   
   * Hudi version : 0.10.0
   
   * Spark version : 3.2.1
   
   * Hive version : 3.1.2
   
   * Hadoop version :3.2.1
   
   * Storage (HDFS/S3/GCS..) : Aliyun oss
   
   * Running on Docker? (yes/no) :no
   
   
   **Additional context**
   
   This is spark writer
   ```scala
   dataSet.writeStream
     .format("org.apache.hudi")
     .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) 
     .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) 
     .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") 
     .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "timestamp")
     .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "day") 
     .option(DataSourceWriteOptions.HIVE_URL.key(), "") 
     .option(DataSourceWriteOptions.HIVE_DATABASE.key(), dbName) 
     .option(DataSourceWriteOptions.HIVE_TABLE.key(), tableName) 
     .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "day") 
     .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true") 
     .option("hoodie.table.name", tableName) 
     .option("hoodie.bulkinsert.shuffle.parallelism", "6")
     .option("hoodie.insert.shuffle.parallelism", "6")
     .option("hoodie.upsert.shuffle.parallelism", "6")
     .option(HIVE_STYLE_PARTITIONING.key(), "true")
     .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), 
"org.apache.hudi.keygen.ComplexKeyGenerator")
     .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key(), "true") 
     .option("path", "")
     .option("checkpointLocation", "")
     .start()
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to