danny0405 commented on code in PR #14320: URL: https://github.com/apache/hudi/pull/14320#discussion_r2553805478
########## website/docs/ingestion_flink.md: ########## @@ -1,179 +1,361 @@ --- title: Using Flink keywords: [hudi, flink, streamer, ingestion] +last_modified_at: 2025-11-22T12:53:57+08:00 --- -### CDC Ingestion -CDC(change data capture) keep track of the data changes evolving in a source system so a downstream process or system can action that change. +## CDC Ingestion + +CDC (change data capture) keeps track of data changes evolving in a source system so a downstream process or system can act on those changes. We recommend two ways for syncing CDC data into Hudi:  -1. Using the Ververica [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) directly connect to DB Server to sync the binlog data into Hudi. - The advantage is that it does not rely on message queues, but the disadvantage is that it puts pressure on the db server; -2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc format, the advantage is that it is highly scalable, +1. Use the Ververica [flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) to directly connect to the database server and sync binlog data into Hudi. + The advantage is that it does not rely on message queues, but the disadvantage is that it puts pressure on the database server. +2. Consume data from a message queue (e.g., Kafka) using the Flink CDC format. The advantage is that it is highly scalable, but the disadvantage is that it relies on message queues. :::note -- If the upstream data cannot guarantee the order, you need to specify option `write.precombine.field` explicitly; +If the upstream data cannot guarantee ordering, you need to explicitly specify the `write.precombine.field` option. ::: -### Bulk Insert +## Bulk Insert -For the demand of snapshot data import. If the snapshot data comes from other data sources, use the `bulk_insert` mode to quickly +For snapshot data import requirements, if the snapshot data comes from other data sources, use the `bulk_insert` mode to quickly import the snapshot data into Hudi. - :::note -`bulk_insert` eliminates the serialization and data merging. The data deduplication is skipped, so the user need to guarantee the uniqueness of the data. +`bulk_insert` eliminates serialization and data merging. Data deduplication is skipped, so the user needs to guarantee data uniqueness. ::: :::note -`bulk_insert` is more efficient in the `batch execution mode`. By default, the `batch execution mode` sorts the input records -by the partition path and writes these records to Hudi, which can avoid write performance degradation caused by -frequent `file handle` switching. +`bulk_insert` is more efficient in `batch execution mode`. By default, `batch execution mode` sorts the input records +by partition path and writes these records to Hudi, which can avoid write‑performance degradation caused by +frequent file‑handle switching. ::: -:::note -The parallelism of `bulk_insert` is specified by `write.tasks`. The parallelism will affect the number of small files. -In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In particular, when each bucket writes to maximum file size, it -will rollover to the new file handle. Finally, `the number of files` >= [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks). +:::note +The parallelism of `bulk_insert` is specified by `write.tasks`. The parallelism affects the number of small files. +In theory, the parallelism of `bulk_insert` equals the number of buckets. (In particular, when each bucket writes to the maximum file size, it +rolls over to a new file handle.) Finally, the number of files ≥ [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks). ::: -#### Options +### Options -| Option Name | Required | Default | Remarks | -| ----------- | ------- | ------- | ------- | -| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open this function | -| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the number of files` >= [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks) | -| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle data according to the input field before writing. Enabling this option will reduce the number of small files, but there may be a risk of data skew | -| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort data according to the input field before writing. Enabling this option will reduce the number of small files when a write task writes multiple partitions | -| `write.sort.memory` | `false` | `128` | Available managed memory of sort operator. default `128` MB | +| Option Name | Required | Default | Remarks | +|-----------------------------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `write.operation` | `true` | `upsert` | Set to `bulk_insert` to enable this function | +| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`; the number of files ≥ [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks) | +| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle data by the input field before writing. Enabling this option reduces the number of small files but may introduce data‑skew risk | +| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort data by the input field before writing. Enabling this option reduces the number of small files when a write task writes to multiple partitions | +| `write.sort.memory` | `false` | `128` | Available managed memory for the sort operator; default is 128 MB | -### Index Bootstrap +## Index Bootstrap -For the demand of `snapshot data` + `incremental data` import. If the `snapshot data` already insert into Hudi by [bulk insert](#bulk-insert). -User can insert `incremental data` in real time and ensure the data is not repeated by using the index bootstrap function. +For importing both snapshot data and incremental data: if the snapshot data has already been inserted into Hudi via [bulk insert](#bulk-insert), +users can insert incremental data in real time and ensure the data is not duplicated by using the index bootstrap function. :::note -If you think this process is very time-consuming, you can add resources to write in streaming mode while writing `snapshot data`, -and then reduce the resources to write `incremental data` (or open the rate limit function). +If you find this process very time‑consuming, you can add resources to write in streaming mode while writing snapshot data, +then reduce the resources when writing incremental data (or enable the rate‑limit function). ::: -#### Options +### Options -| Option Name | Required | Default | Remarks | -| ----------- | ------- | ------- | ------- | -| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is enabled, the remain records in Hudi table will be loaded into the Flink state at one time | -| `index.partition.regex` | `false` | `*` | Optimize option. Setting regular expressions to filter partitions. By default, all partitions are loaded into flink state | +| Option Name | Required | Default | Remarks | +|---------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------| +| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is enabled, the remaining records in the Hudi table are loaded into the Flink state at once | +| `index.partition.regex` | `false` | `*` | Optimization option. Set a regular expression to filter partitions. By default, all partitions are loaded into Flink state | -#### How To Use +### How to use -1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note that the `table.type` must be correct. -2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap function. -3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : `execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink checkpoint scheduling times). -4. Waiting until the first checkpoint succeeds, indicating that the index bootstrap completed. -5. After the index bootstrap completed, user can exit and save the savepoint (or directly use the externalized checkpoint). -6. Restart the job, setting `index.bootstrap.enable` as `false`. +1. Use `CREATE TABLE` to create a statement corresponding to the Hudi table. Note that `table.type` must be correct. +2. Set `index.bootstrap.enabled` = `true` to enable the index bootstrap function. +3. Set the Flink checkpoint failure tolerance in `flink-conf.yaml`: `execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink checkpoint scheduling times). +4. Wait until the first checkpoint succeeds, indicating that the index bootstrap has completed. +5. After the index bootstrap completes, users can exit and save the savepoint (or directly use the externalized checkpoint). +6. Restart the job, setting `index.bootstrap.enable` to `false`. :::note -1. Index bootstrap is blocking, so checkpoint cannot be completed during index bootstrap. -2. Index bootstrap triggers by the input data. User need to ensure that there is at least one record in each partition. -3. Index bootstrap executes concurrently. User can search in log by `finish loading the index under partition` and `Load record form file` to observe the progress of index bootstrap. -4. The first successful checkpoint indicates that the index bootstrap completed. There is no need to load the index again when recovering from the checkpoint. + +1. Index bootstrap is blocking, so checkpoints cannot complete during index bootstrap. +2. Index bootstrap is triggered by the input data. Users need to ensure that there is at least one record in each partition. +3. Index bootstrap executes concurrently. Users can search logs for `finish loading the index under partition` and `Load record from file` to observe the index‑bootstrap progress. +4. The first successful checkpoint indicates that the index bootstrap has completed. There is no need to load the index again when recovering from the checkpoint. + ::: -### Changelog Mode -Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then consumes through stateful computing of flink to have a near-real-time -data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores messages in the forms of rows, which supports the retention of all change logs (Integration at the format level). -All changelog records can be consumed with Flink streaming reader. +## Changelog Mode -#### Options +Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then consume them through stateful computing in Flink to build a near‑real‑time +data‑warehouse ETL pipeline (incremental computing). Hudi MOR tables store messages as rows, which supports the retention of all change logs (integration at the format level). +All changelog records can be consumed with the Flink streaming reader. -| Option Name | Required | Default | Remarks | -| ----------- | ------- | ------- | ------- | -| `changelog.enabled` | `false` | `false` | It is turned off by default, to have the `upsert` semantics, only the merged messages are ensured to be kept, intermediate changes may be merged. Setting to true to support consumption of all changes | +### Options + +| Option Name | Required | Default | Remarks | +|---------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `changelog.enabled` | `false` | `false` | It is turned off by default, to have the `upsert` semantics, only the merged messages are ensured to be kept, intermediate changes may be merged. Setting to true to support consumption of all changes | :::note -Batch (Snapshot) read still merge all the intermediate changes, regardless of whether the format has stored the intermediate changelog messages. +Batch (snapshot) reads still merge all the intermediate changes, regardless of whether the format has stored the intermediate changelog messages. ::: :::note -After setting `changelog.enable` as `true`, the retention of changelog records are only best effort: the asynchronous compaction task will merge the changelog records into one record, so if the -stream source does not consume timely, only the merged record for each key can be read after compaction. The solution is to reserve some buffer time for the reader by adjusting the compaction strategy, such as -the compaction options: [`compaction.delta_commits`](#compaction) and [`compaction.delta_seconds`](#compaction). +After setting `changelog.enable` to `true`, the retention of changelog records is best‑effort only: the asynchronous compaction task will merge the changelog records into one record, so if the +stream source does not consume in a timely manner, only the merged record for each key can be read after compaction. The solution is to reserve buffer time for the reader by adjusting the compaction strategy, such as +the compaction options `compaction.delta_commits` and `compaction.delta_seconds`. ::: +## Append Mode + +For `INSERT` mode write operations, the current workflow is: + +- For Merge‑on‑Read tables, auto‑file sizing is enabled by default +- For Copy‑on‑Write tables, new parquet files are written directly; auto-file sizing is not applied Review Comment: The MOR behavior is now changed to be the same with COW: new parquet files are written directly; auto-file sizing is not applied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
