joeytman opened a new issue, #9971: URL: https://github.com/apache/hudi/issues/9971
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** Simple Bucket Index behaves differently in Spark vs Flink. Spark's Hudi writer maps records to numeric bucket IDs like `0004906` whereas Flink's hudi writer is mapping records to a hexadecimal bucket ID like `af35f7fa`. As a result, if you bootstrap a Hudi table with a batch spark job in bulk insert mode and then set up a Flink streaming job to apply CDC updates to the table, records are mapped to entirely new bucket IDs. This leads to the old files never being updated, creating new buckets as records come in, treating updates as inserts and leading to duplicates. Hudi provides Flink [Index Bootstrap](https://hudi.apache.org/docs/hoodie_streaming_ingestion#index-bootstrap) to address this issue, but it seems like an incomplete approach. As per the docs [here](https://hudi.apache.org/docs/indexing/#flink-based-configs) both BUCKET index and FLINK_STATE index are supported. The description of index bootstrap states that: > When index bootstrap is enabled, the remain records in Hudi table will be loaded into the Flink state at one time This seems to imply that index bootstrap only works for FLINK_STATE index type -- as I understand it, we should not need to read all of the records into Flink state in order to bootstrap state for BUCKET index, the file group for a record should be uniquely identified by hashing the primary key(s). So, why do Spark and Flink have differing bucket index behavior? Seems like we could avoid index bootstrap if they just used the same hash function and same bucket naming convention. **To Reproduce** Steps to reproduce the behavior: 1. Create a Hudi table using Spark with Simple Bucket Index via Bulk Insert 2. Configure a Flink pipeline to UPSERT records to the Hudi table, using the same settings for bucket index. 3. Observe as entirely new file groups are created and the existing parquet files are never updated **Expected behavior** When configuring the Flink pipeline with identical settings to the Spark pipeline, the Flink pipeline should recognize existing buckets and map rows to the correct buckets created by the Spark bootstrap **Environment Description** * EMR version: 6.14 * Hudi version : 0.14.0 (Spark), 0.13.1 (Flink) [Note the discrepancy is due to EMR supporting differing versions of Hudi based on execution engine] * Spark version : 3.4 * Flink version : 1.17 * Hive version : 3.1.3 * Hadoop version : 3.3.3 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Additional Context** Bootstrapped files look like: ``` 2023-10-27 19:50:16 454.2 MiB 00000009-a122-4ea6-8872-1ff613d133c3-0_9-247-0_20231027194527266.parquet ``` Flink-written files look like: ``` 2023-11-01 14:16:06 60.7 MiB d181766b-8624-47dd-b025-a17da16c9296_0-4-1_20231101135648178.parquet ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
