WTa-hash opened a new issue #2255:
URL: https://github.com/apache/hudi/issues/2255


   We are running a Spark Structured Stream that is processing data from 
Kinesis to Hudi. The Hudi table type is configured as **Merge-On-Read (MOR)** 
with index set to **Global Bloom** and update partition path set to true. _We 
observed an issue where Hudi is not merging data changes correctly when the 
partition changes. It seems Hudi is inserting data changes rather than merging 
incoming changes with existing data._
   
   I am able to reproduce the issue with a smaller dataset, which is attached 
as script (scala). The example script will be processed by Hudi in 3 batches, 
and under MOR table type, you will see that records get inserted into the Hudi 
table instead of updating the existing hudi records. For each batch, you will 
see the partition value change - we have update partition path set to true, so 
Hudi should merge with existing data and update the partition path, but this 
doesn't seem to behave correctly using MOR table. If you change the script to 
output a Copy-On-Write (COW) table, then it seems Hudi behaves correctly. I 
will attach output records for both COW and MOR.
   
   **SCRIPT TO REPRODUCE ISSUE**: 
   [TestData.txt](https://github.com/apache/hudi/files/5548826/TestData.txt)
   
   **OUTPUT DATA (COW + MOR)**:
   Batch 1:
   
![data1_cow](https://user-images.githubusercontent.com/64644025/99291602-b31e7f80-2805-11eb-8e58-60c006972c17.PNG)
   
![data1_mor](https://user-images.githubusercontent.com/64644025/99291617-b74a9d00-2805-11eb-9596-82f90be0569a.PNG)
   For Batch 1, both COW and MOR display the expected output. This is the first 
batch, so Hudi will create the table and insert data.
   
   Batch 2:
   
![data2_cow](https://user-images.githubusercontent.com/64644025/99291777-efea7680-2805-11eb-96d7-20c998eeff01.PNG)
   
![data2_mor](https://user-images.githubusercontent.com/64644025/99291785-f2e56700-2805-11eb-9094-5fe270843519.PNG)
   For Batch 2, the COW table is processed correctly and as expected. Hudi 
updates the partition key correctly in COW table. However, MOR table behaves 
differently here as the Batch 2 records are inserted instead of updating the 
existing data.
   
   Batch 3:
   
![data3_cow](https://user-images.githubusercontent.com/64644025/99292085-5e2f3900-2806-11eb-9a98-0b1215a3d9d4.PNG)
   
![data3_mor](https://user-images.githubusercontent.com/64644025/99292100-61c2c000-2806-11eb-8cf1-0866e70c8d6a.PNG)
   For Batch 3, the COW table is processed correctly again, but MOR table still 
behaves differently by inserting the records instead of updating the existing 
data.
   
   **HUDI PROPERTIES**
   DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
   DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> 
classOf[OverwriteWithLatestAvroPayload].getName,
   DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getName,
   HoodieStorageConfig.PARQUET_FILE_MAX_BYTES -> String.valueOf(120 * 1024 * 
1024),
   HoodieCompactionConfig.INLINE_COMPACT_PROP -> String.valueOf(false),
   HoodieCompactionConfig.PAYLOAD_CLASS_PROP -> 
classOf[OverwriteWithLatestAvroPayload].getName,
   "hoodie.compact.inline.max.delta.commits" -> "1",
   "hoodie.datasource.compaction.async.enable" -> String.valueOf(true),
   HoodieCompactionConfig.COMPACTION_STRATEGY_PROP -> 
classOf[UnBoundedCompactionStrategy].getName,
   "hoodie.clean.automatic" -> String.valueOf(true),
   "hoodie.clean.async" -> String.valueOf(true),
   "hoodie.cleaner.commits.retained" -> "5",
   HoodieIndexConfig.INDEX_TYPE_PROP -> 
HoodieIndex.IndexType.GLOBAL_BLOOM.toString,
   HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH -> String.valueOf(true),
   "hoodie.consistency.check.enabled" -> String.valueOf(true),
   DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> String.valueOf(true),
   DataSourceWriteOptions.HIVE_URL_OPT_KEY -> "jdbc:hive2://localhost:10000",
   DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
classOf[MultiPartKeysValueExtractor].getName,
   DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY -> databaseName,
   DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> tableName,
   DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "__process_date",
   HoodieWriteConfig.TABLE_NAME -> tableName,
   DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
   DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "__timestamp",
   DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "__process_date"
   
   **ENVIRONMENT**:
   AWS EMR: 5.31.0
   Hudi version : 0.6.0
   Spark version : 2.4.6
   Hive version : 2.3.7
   Hadoop version : 2.10.0
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : no


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to