ergophobiac opened a new issue, #11632: URL: https://github.com/apache/hudi/issues/11632
Hello, We have a Spark Structured Streaming Job running on EMR 6.10.1, Hudi 0.12.2, Spark 3.3.1 (reading from Kafka and writing to S3). This runs an upsert job on several of our MOR datasets. We have noticed a ton of duplicates in these tables. We looked into it and found that bulk insert and precombine (during upsert) are running fine. So we think the problem is with the Hudi configs. Our Hudi options: **hoodie.table.version -> 5 hoodie.datasource.write.commitmeta.key.prefix -> _ hoodie.datasource.write.hive_style_partitioning -> True hoodie.datasource.write.operation -> upsert hoodie.datasource.hive_sync.enable -> True hoodie.datasource.hive_sync.auto_create_database -> True hoodie.datasource.hive_sync.skip_ro_suffix -> True hoodie.parquet.small.file.limit -> 251658240 hoodie.parquet.max.file.size -> 251658240 hoodie.compact.inline.trigger.strategy -> NUM_OR_TIME hoodie.compact.inline.max.delta.commits -> 3 hoodie.compact.inline.max.delta.seconds -> 600 hoodie.parquet.compression.codec -> snappy hoodie.clean.automatic -> True hoodie.index.type -> BLOOM hoodie.keep.max.commits -> 50 hoodie.archive.automatic -> True hoodie.archive.beyond.savepoint -> True hoodie.archive.merge.enable -> True hoodie.cleaner.policy.failed.writes -> LAZY hoodie.write.concurrency.mode -> OPTIMISTIC_CONCURRENCY_CONTROL hoodie.write.lock.provider -> org.apache.hudi.client.transaction.lock.InProcessLockProvider hoodie.metadata.enable -> False hoodie.clean.async -> True hoodie.archive.async -> True hoodie.cleaner.policy -> KEEP_LATEST_BY_HOURS hoodie.cleaner.hours.retained -> 1 hoodie.datasource.write.table.name -> table_A hoodie.table.name -> table_A hoodie.datasource.write.table.type -> MERGE_ON_READ hoodie.datasource.write.recordkey.field -> id hoodie.datasource.write.partitionpath.field -> _month hoodie.datasource.write.precombine.field -> _kafka_offset hoodie.datasource.hive_sync.table -> table_A hoodie.datasource.hive_sync.database -> table_A hoodie.base.path -> s3://data-lake-raw/table_A/table_A hoodie.parquet.block.size -> 251658240 hoodie.parquet.page.size -> 104876 hoodie.datasource.write.keygenerator.class -> org.apache.hudi.keygen.SimpleKeyGenerator hoodie.datasource.hive_sync.partition_extractor_class -> org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.metrics.graphite.metric.prefix -> emr_cluster_id.hudi.table_A** Within duplicate records, we have noticed that _hoodie_commit_seqno, _hoodie_file_name and (sometimes) _hoodie_commit_time is different for all records, while _hoodie_record_key and _hoodie_partition_path are the same. So duplicates reside in the same partition, but in different files. We suspect new upserts may be getting mapped to incorrect base files, eventually ending up as inserts in the new base file created after compaction. We have been running stable production loads for 6 months with these configs, so we know this issue wasn't there before. Every time we restart our streaming job, we can see another duplicate record being created in the same partition. Can anyone help here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
