ergophobiac opened a new issue, #11632:
URL: https://github.com/apache/hudi/issues/11632

   Hello,
   We have a Spark Structured Streaming Job running on EMR 6.10.1, Hudi 0.12.2, 
Spark 3.3.1 (reading from Kafka and writing to S3). This runs an upsert job on 
several of our MOR datasets. We have noticed a ton of duplicates in these 
tables. 
   
   We looked into it and found that bulk insert and precombine (during upsert) 
are running fine. So we think the problem is with the Hudi configs.
   
   Our Hudi options:
   
   **hoodie.table.version                                     ->  5
   hoodie.datasource.write.commitmeta.key.prefix            ->  _
   hoodie.datasource.write.hive_style_partitioning          ->  True
   hoodie.datasource.write.operation                        ->  upsert
   hoodie.datasource.hive_sync.enable                       ->  True
   hoodie.datasource.hive_sync.auto_create_database         ->  True
   hoodie.datasource.hive_sync.skip_ro_suffix               ->  True
   hoodie.parquet.small.file.limit                          ->  251658240
   hoodie.parquet.max.file.size                             ->  251658240
   hoodie.compact.inline.trigger.strategy                   ->  NUM_OR_TIME
   hoodie.compact.inline.max.delta.commits                  ->  3
   hoodie.compact.inline.max.delta.seconds                  ->  600
   hoodie.parquet.compression.codec                         ->  snappy
   hoodie.clean.automatic                                   ->  True
   hoodie.index.type                                        ->  BLOOM
   hoodie.keep.max.commits                                  ->  50
   hoodie.archive.automatic                                 ->  True
   hoodie.archive.beyond.savepoint                          ->  True
   hoodie.archive.merge.enable                              ->  True
   hoodie.cleaner.policy.failed.writes                      ->  LAZY
   hoodie.write.concurrency.mode                            ->  
OPTIMISTIC_CONCURRENCY_CONTROL
   hoodie.write.lock.provider                               ->  
org.apache.hudi.client.transaction.lock.InProcessLockProvider
   hoodie.metadata.enable                                   ->  False
   hoodie.clean.async                                       ->  True
   hoodie.archive.async                                     ->  True
   hoodie.cleaner.policy                                    ->  
KEEP_LATEST_BY_HOURS
   hoodie.cleaner.hours.retained                            ->  1
   hoodie.datasource.write.table.name                       ->  table_A
   hoodie.table.name                                        ->  table_A
   hoodie.datasource.write.table.type                       ->  MERGE_ON_READ
   hoodie.datasource.write.recordkey.field                  ->  id
   hoodie.datasource.write.partitionpath.field              ->  _month
   hoodie.datasource.write.precombine.field                 ->  _kafka_offset
   hoodie.datasource.hive_sync.table                        ->  table_A
   hoodie.datasource.hive_sync.database                     ->  table_A
   hoodie.base.path                                         ->  
s3://data-lake-raw/table_A/table_A
   hoodie.parquet.block.size                                ->  251658240
   hoodie.parquet.page.size                                 ->  104876
   hoodie.datasource.write.keygenerator.class               ->  
org.apache.hudi.keygen.SimpleKeyGenerator
   hoodie.datasource.hive_sync.partition_extractor_class    ->  
org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.metrics.graphite.metric.prefix                    ->  
emr_cluster_id.hudi.table_A**
   
   
   
   Within duplicate records, we have noticed that _hoodie_commit_seqno, 
_hoodie_file_name and (sometimes) _hoodie_commit_time is different for all 
records, while _hoodie_record_key and _hoodie_partition_path are the same. So 
duplicates reside in the same partition, but in different files. We suspect new 
upserts may be getting mapped to incorrect base files, eventually ending up as 
inserts in the new base file created after compaction. 
   
   We have been running stable production loads for 6 months with these 
configs, so we know this issue wasn't there before. Every time we restart our 
streaming job, we can see another duplicate record being created in the same 
partition. Can anyone help here?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to