mzheng-plaid opened a new issue, #9977: URL: https://github.com/apache/hudi/issues/9977
**Describe the problem you faced** As background, due to https://github.com/apache/hudi/issues/9934 we're testing out clustering our table to have fewer base files in our MOR table. We set up a test by copying an existing table. This table only had base files (no log files) in its initial state. We wanted to verify the performance of clustering as well as data correctness. We clustered one partition and found that **261736 rows were missing after clustering**. We used the following clustering configuration (and the other configurations in "Additional Context"): ``` # Clustering configs "hoodie.clustering.inline": "true", "hoodie.clustering.inline.max.commits": 1, "hoodie.clustering.plan.strategy.small.file.limit": 256 * 1024 * 1024, "hoodie.clustering.plan.strategy.target.file.max.bytes": 512 * 1024 * 1024, "hoodie.clustering.plan.strategy.sort.columns": "itemId.value", "hoodie.clustering.plan.strategy.partition.selected": "dt=2022-08-29", "hoodie.clustering.plan.strategy.max.num.groups": 30, ``` Our clustering code ran as follows: 1. Read one row from partition `dt=2022-08-29` 2. Write out the row (this is just a dummy way of triggering clustering inline), this update will be a no-op. We set "hoodie.clustering.plan.strategy.partition.selected" to be `dt=2022-08-29` to only cluster the partition that was written to. After the write finished I compared the clustered/unclustered tables (we had another copy before running this). Before clustering we had 399896071 rows in that partition and after clustering 399634335 rows in that partition (261736 rows were lost). Joining the two tables, I saw that **all** the missing rows were from **one** base file that was clustered. **This interestingly was the base file that received the update of 1 row**: ``` # Spark code to find the hoodie file and record key for each of the missing rows meta_joined_df = unclustered_df.select( "_hoodie_file_name", "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_is_deleted", ).alias("a").join( clustered_df.select( "_hoodie_file_name", "_hoodie_commit_time", "_hoodie_commit_seqno", "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_is_deleted", ).alias("b"), on=F.col("a._hoodie_record_key") == F.col("b._hoodie_record_key"), how="full_outer", ).cache() meta_joined_df.filter(F.col("b._hoodie_record_key").isNull()).groupBy( F.col("a._hoodie_file_name"), ).count().alias("count").orderBy("count", ascending=False).show( n=10, truncate=False ) ``` Output: ``` +-----------------------------------------------------------------------------------+------+ |_hoodie_file_name |count | +-----------------------------------------------------------------------------------+------+ |f0b917f5-607e-47c4-96a4-092b4668c436-0_254-10835-21844023_20231016122622692.parquet|261736| +-----------------------------------------------------------------------------------+------+ ``` The `deltacommit` shows this file was the one that received the update: ``` { "partitionToWriteStats" : { "dt=2022-08-29" : [ { "fileId" : "f0b917f5-607e-47c4-96a4-092b4668c436-0", "path" : "dt=2022-08-29/.f0b917f5-607e-47c4-96a4-092b4668c436-0_20231016122622692.log.1_0-29-5280", "prevCommit" : "20231016122622692", "numWrites" : 1, "numDeletes" : 0, "numUpdateWrites" : 1, "numInserts" : 0, "totalWriteBytes" : 13402, "totalWriteErrors" : 0, "tempPath" : null, "partitionPath" : "dt=2022-08-29", "totalLogRecords" : 0, "totalLogFilesCompacted" : 0, "totalLogSizeCompacted" : 0, "totalUpdatedRecordsCompacted" : 0, "totalLogBlocks" : 0, "totalCorruptLogBlock" : 0, "totalRollbackBlocks" : 0, "fileSizeInBytes" : 13402, "minEventTime" : null, "maxEventTime" : null, "runtimeStats" : { "totalScanTime" : 0, "totalUpsertTime" : 2327, "totalCreateTime" : 0 }, "logVersion" : 1, "logOffset" : 0, "baseFile" : "f0b917f5-607e-47c4-96a4-092b4668c436-0_254-10835-21844023_20231016122622692.parquet", "logFiles" : [ ".f0b917f5-607e-47c4-96a4-092b4668c436-0_20231016122622692.log.1_0-29-5280" ], "recordsStats" : { "val" : null } } ] }, "compacted" : false, "extraMetadata" : { "schema" : … }, "operationType" : "UPSERT" } ``` I inspected the `replacecommit` and `f0b917f5-607e-47c4-96a4-092b4668c436-0` was in `partitionToReplaceFileIds` (360 out of 763 base files were clustered). Even stranger, the impacted base file has 523472 rows, which means exactly **half** the records from the base file were lost. **To Reproduce** Unclear, but I've been able to reproduce this every time on every partition I've tried. **Expected behavior** We do not expect silent data loss - I'm not sure if this is a Hudi bug or misconfiguration. This may be related to https://github.com/apache/hudi/issues/7839 which was never reproduced **Environment Description** EMR 6.10.1 Hudi version : 0.12.2-amzn-0 Spark version : 3.3.1 Hive version : 3.1.3 Hadoop version : 3.3.3 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : Spark on Docker **Additional context** We use the following writer configurations. There were no failures in the Spark application. ``` 'hoodie.compact.inline': True, 'hoodie.compact.inline.max.delta.commits': 6, 'hoodie.cleaner.commits.retained': 1, 'hoodie.compaction.target.io': 30000000, 'hoodie.parquet.max.file.size': 536870912, 'hoodie.parquet.block.size': 536870912, 'hoodie.parquet.small.file.limit': 268435456, 'hoodie.bloom.index.prune.by.ranges': 'false', 'hoodie.upsert.shuffle.parallelism': 9000, 'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload', 'hoodie.compaction.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload', 'hoodie.rollback.parallelism': 500, 'hoodie.commits.archival.batch': 5, "hoodie.table.name": xxx, "hoodie.datasource.write.table.name": xxx, "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.table.type": "MERGE_ON_READ", "hoodie.datasource.write.partitionpath.field": "dt:SIMPLE", "hoodie.datasource.write.recordkey.field": "id.value", "hoodie.datasource.write.precombine.field": "ts", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.CustomKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.metadata.enable": "false", "hoodie.bootstrap.index.enable": "false", "hoodie.index.type": "BLOOM", "hoodie.datasource.hive_sync.enable": "true", ``` The table properties are: ``` hoodie.table.timeline.timezone=LOCAL hoodie.table.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator hoodie.table.precombine.field=publishedAtUnixNano hoodie.table.version=5 hoodie.database.name= hoodie.datasource.write.hive_style_partitioning=true hoodie.partition.metafile.use.base.format=false hoodie.archivelog.folder=archived hoodie.table.name=xxx hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload hoodie.populate.meta.fields=true hoodie.table.type=MERGE_ON_READ hoodie.datasource.write.partitionpath.urlencode=false hoodie.table.base.file.format=PARQUET hoodie.datasource.write.drop.partition.columns=false hoodie.timeline.layout.version=1 hoodie.table.partition.fields=dt hoodie.table.recordkey.fields=id.value hoodie.table.checksum=3616660964 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
