joeytman opened a new issue, #9971:
URL: https://github.com/apache/hudi/issues/9971

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   Simple Bucket Index behaves differently in Spark vs Flink.  Spark's Hudi 
writer maps records to numeric bucket IDs like `0004906` whereas Flink's hudi 
writer is mapping records to a hexadecimal bucket ID like `af35f7fa`. As a 
result, if you  bootstrap a Hudi table with a batch spark job in bulk insert 
mode and then set up a Flink streaming job to apply CDC updates to the table, 
records are mapped to entirely new bucket IDs. This leads to the old files 
never being updated, creating new buckets as records come in, treating updates 
as inserts and leading to duplicates.
   
   Hudi provides Flink [Index 
Bootstrap](https://hudi.apache.org/docs/hoodie_streaming_ingestion#index-bootstrap)
 to address this issue, but it seems like an incomplete approach. As per the 
docs [here](https://hudi.apache.org/docs/indexing/#flink-based-configs) both 
BUCKET index and FLINK_STATE index are supported. The description of index 
bootstrap states that:
   > When index bootstrap is enabled, the remain records in Hudi table will be 
loaded into the Flink state at one time
   
   This seems to imply that index bootstrap only works for FLINK_STATE index 
type -- as I understand it, we should not need to read all of the records into 
Flink state in order to bootstrap state for BUCKET index, the file group for a 
record should be uniquely identified by hashing the primary key(s). 
   
   So, why do Spark and Flink have differing bucket index behavior? Seems like 
we could avoid index bootstrap if they just used the same hash function and 
same bucket naming convention.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a Hudi table using Spark with Simple Bucket Index via Bulk Insert 
   2. Configure a Flink pipeline to UPSERT records to the Hudi table, using the 
same settings for bucket index.
   3. Observe as entirely new file groups are created and the existing parquet 
files are never updated
   
   **Expected behavior**
   
   When configuring the Flink pipeline with identical settings to the Spark 
pipeline, the Flink pipeline should recognize existing buckets and map rows to 
the correct buckets created by the Spark bootstrap
   
   **Environment Description**
   
   * EMR version: 6.14
   
   * Hudi version : 0.14.0 (Spark), 0.13.1 (Flink) [Note the discrepancy is due 
to EMR supporting differing versions of Hudi based on execution engine]
   
   * Spark version : 3.4
   
   * Flink version : 1.17
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional Context**
   
   Bootstrapped files look like:
   ```
   2023-10-27 19:50:16  454.2 MiB 
00000009-a122-4ea6-8872-1ff613d133c3-0_9-247-0_20231027194527266.parquet
   ```
   
   Flink-written files look like:
   ```
   2023-11-01 14:16:06   60.7 MiB 
d181766b-8624-47dd-b025-a17da16c9296_0-4-1_20231101135648178.parquet
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to