mzheng-plaid opened a new issue, #9977:
URL: https://github.com/apache/hudi/issues/9977

   **Describe the problem you faced**
   
   As background, due to https://github.com/apache/hudi/issues/9934 we're 
testing out clustering our table to have fewer base files in our MOR table.
   
   We set up a test by copying an existing table. This table only had base 
files (no log files) in its initial state. We wanted to verify the performance 
of clustering as well as data correctness. We clustered one partition and found 
that **261736 rows were missing after clustering**.
   
   We used the following clustering configuration (and the other configurations 
in "Additional Context"):
   ```
   # Clustering configs
   "hoodie.clustering.inline": "true",
   "hoodie.clustering.inline.max.commits": 1,
   "hoodie.clustering.plan.strategy.small.file.limit": 256 * 1024 * 1024,
   "hoodie.clustering.plan.strategy.target.file.max.bytes": 512 * 1024 * 1024,
   "hoodie.clustering.plan.strategy.sort.columns": "itemId.value",
   "hoodie.clustering.plan.strategy.partition.selected": "dt=2022-08-29",
   "hoodie.clustering.plan.strategy.max.num.groups": 30,
   ```
   
   Our clustering code ran as follows:
   
   1. Read one row from partition `dt=2022-08-29`
   2. Write out the row (this is just a dummy way of triggering clustering 
inline), this update will be a no-op. We set 
"hoodie.clustering.plan.strategy.partition.selected" to be `dt=2022-08-29` to 
only cluster the partition that was written to.
   
   After the write finished I compared the clustered/unclustered tables (we had 
another copy before running this). Before clustering we had 399896071 rows in 
that partition and after clustering 399634335 rows in that partition (261736 
rows were lost). 
   
   Joining the two tables, I saw that **all** the missing rows were from 
**one** base file that was clustered. **This interestingly was the base file 
that received the update of 1 row**:
   
   ```
   # Spark code to find the hoodie file and record key for each of the missing 
rows
   meta_joined_df = unclustered_df.select(
       "_hoodie_file_name", 
       "_hoodie_commit_time", 
       "_hoodie_commit_seqno", 
       "_hoodie_record_key",
       "_hoodie_partition_path",
       "_hoodie_is_deleted",
   ).alias("a").join(
       clustered_df.select(
           "_hoodie_file_name", 
           "_hoodie_commit_time", 
           "_hoodie_commit_seqno", 
           "_hoodie_record_key",
           "_hoodie_partition_path",
           "_hoodie_is_deleted",
       ).alias("b"),
       on=F.col("a._hoodie_record_key") == F.col("b._hoodie_record_key"),
       how="full_outer",
   ).cache()
   meta_joined_df.filter(F.col("b._hoodie_record_key").isNull()).groupBy(
       F.col("a._hoodie_file_name"),
   ).count().alias("count").orderBy("count", ascending=False).show(
       n=10,
       truncate=False
   )
   ```
   
   Output:
   ```
   
+-----------------------------------------------------------------------------------+------+
   |_hoodie_file_name                                                           
       |count |
   
+-----------------------------------------------------------------------------------+------+
   
|f0b917f5-607e-47c4-96a4-092b4668c436-0_254-10835-21844023_20231016122622692.parquet|261736|
   
+-----------------------------------------------------------------------------------+------+
   ```
   
   The `deltacommit` shows this file was the one that received the update:
   ```
   {
     "partitionToWriteStats" : {
       "dt=2022-08-29" : [ {
         "fileId" : "f0b917f5-607e-47c4-96a4-092b4668c436-0",
         "path" : 
"dt=2022-08-29/.f0b917f5-607e-47c4-96a4-092b4668c436-0_20231016122622692.log.1_0-29-5280",
         "prevCommit" : "20231016122622692",
         "numWrites" : 1,
         "numDeletes" : 0,
         "numUpdateWrites" : 1,
         "numInserts" : 0,
         "totalWriteBytes" : 13402,
         "totalWriteErrors" : 0,
         "tempPath" : null,
         "partitionPath" : "dt=2022-08-29",
         "totalLogRecords" : 0,
         "totalLogFilesCompacted" : 0,
         "totalLogSizeCompacted" : 0,
         "totalUpdatedRecordsCompacted" : 0,
         "totalLogBlocks" : 0,
         "totalCorruptLogBlock" : 0,
         "totalRollbackBlocks" : 0,
         "fileSizeInBytes" : 13402,
         "minEventTime" : null,
         "maxEventTime" : null,
         "runtimeStats" : {
           "totalScanTime" : 0,
           "totalUpsertTime" : 2327,
           "totalCreateTime" : 0
         },
         "logVersion" : 1,
         "logOffset" : 0,
         "baseFile" : 
"f0b917f5-607e-47c4-96a4-092b4668c436-0_254-10835-21844023_20231016122622692.parquet",
         "logFiles" : [ 
".f0b917f5-607e-47c4-96a4-092b4668c436-0_20231016122622692.log.1_0-29-5280" ],
         "recordsStats" : {
           "val" : null
         }
       } ]
     },
     "compacted" : false,
     "extraMetadata" : {
       "schema" : …
     },
     "operationType" : "UPSERT"
   }
   ```
   
   I inspected the `replacecommit` and `f0b917f5-607e-47c4-96a4-092b4668c436-0` 
was in `partitionToReplaceFileIds` (360 out of 763 base files were clustered). 
Even stranger, the impacted base file has 523472 rows, which means exactly 
**half** the records from the base file were lost.
   
   **To Reproduce**
   Unclear, but I've been able to reproduce this every time on every partition 
I've tried.
   
   **Expected behavior**
   We do not expect silent data loss - I'm not sure if this is a Hudi bug or 
misconfiguration. This may be related to 
https://github.com/apache/hudi/issues/7839 which was never reproduced
   
   **Environment Description**
   
   EMR 6.10.1
   
   Hudi version : 0.12.2-amzn-0
   
   Spark version : 3.3.1
   
   Hive version : 3.1.3
   
   Hadoop version : 3.3.3
   
   Storage (HDFS/S3/GCS..) : S3
   
   Running on Docker? (yes/no) : Spark on Docker
   
   
   **Additional context**
   
   We use the following writer configurations. There were no failures in the 
Spark application.
   
   ```
   'hoodie.compact.inline': True,
   'hoodie.compact.inline.max.delta.commits': 6, 
   'hoodie.cleaner.commits.retained': 1, 
   'hoodie.compaction.target.io': 30000000, 
   'hoodie.parquet.max.file.size': 536870912, 
   'hoodie.parquet.block.size': 536870912, 
   'hoodie.parquet.small.file.limit': 268435456, 
   'hoodie.bloom.index.prune.by.ranges': 'false', 
   'hoodie.upsert.shuffle.parallelism': 9000, 
   'hoodie.datasource.write.payload.class': 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload', 
'hoodie.compaction.payload.class': 
'org.apache.hudi.common.model.DefaultHoodieRecordPayload', 
   'hoodie.rollback.parallelism': 500, 
   'hoodie.commits.archival.batch': 5,
   "hoodie.table.name": xxx,
   "hoodie.datasource.write.table.name": xxx,
   "hoodie.datasource.write.operation": "upsert",
   "hoodie.datasource.write.table.type": "MERGE_ON_READ",
   "hoodie.datasource.write.partitionpath.field": "dt:SIMPLE",
   "hoodie.datasource.write.recordkey.field": "id.value",
   "hoodie.datasource.write.precombine.field": "ts",
   "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.CustomKeyGenerator",
   "hoodie.datasource.write.hive_style_partitioning": "true",
   "hoodie.metadata.enable": "false",
   "hoodie.bootstrap.index.enable": "false",
   "hoodie.index.type": "BLOOM",
   "hoodie.datasource.hive_sync.enable": "true",
   ```
   
   The table properties are:
   ```
   hoodie.table.timeline.timezone=LOCAL
   hoodie.table.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
   hoodie.table.precombine.field=publishedAtUnixNano
   hoodie.table.version=5
   hoodie.database.name=
   hoodie.datasource.write.hive_style_partitioning=true
   hoodie.partition.metafile.use.base.format=false
   hoodie.archivelog.folder=archived
   hoodie.table.name=xxx
   
hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
   hoodie.populate.meta.fields=true
   hoodie.table.type=MERGE_ON_READ
   hoodie.datasource.write.partitionpath.urlencode=false
   hoodie.table.base.file.format=PARQUET
   hoodie.datasource.write.drop.partition.columns=false
   hoodie.timeline.layout.version=1
   hoodie.table.partition.fields=dt
   hoodie.table.recordkey.fields=id.value
   hoodie.table.checksum=3616660964
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to