danny0405 commented on code in PR #14320:
URL: https://github.com/apache/hudi/pull/14320#discussion_r2553805478


##########
website/docs/ingestion_flink.md:
##########
@@ -1,179 +1,361 @@
 ---
 title: Using Flink
 keywords: [hudi, flink, streamer, ingestion]
+last_modified_at: 2025-11-22T12:53:57+08:00
 ---
 
-### CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
+## CDC Ingestion
+
+CDC (change data capture) keeps track of data changes evolving in a source 
system so a downstream process or system can act on those changes.
 We recommend two ways for syncing CDC data into Hudi:
 
 ![slide1 title](/assets/images/cdc-2-hudi.png)
 
-1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
-   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
+1. Use the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) to 
directly connect to the database server and sync binlog data into Hudi.
+   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the database server.
+2. Consume data from a message queue (e.g., Kafka) using the Flink CDC format. 
The advantage is that it is highly scalable,
    but the disadvantage is that it relies on message queues.
 
 :::note
-- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
+If the upstream data cannot guarantee ordering, you need to explicitly specify 
the `write.precombine.field` option.
 :::
 
-### Bulk Insert
+## Bulk Insert
 
-For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
+For snapshot data import requirements, if the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
 import the snapshot data into Hudi.
 
-
 :::note
-`bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
+`bulk_insert` eliminates serialization and data merging. Data deduplication is 
skipped, so the user needs to guarantee data uniqueness.
 :::
 
 :::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by
-frequent `file handle` switching.  
+`bulk_insert` is more efficient in `batch execution mode`. By default, `batch 
execution mode` sorts the input records
+by partition path and writes these records to Hudi, which can avoid 
write‑performance degradation caused by
+frequent file‑handle switching.
 :::
 
-:::note  
-The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
+:::note
+The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism affects the number of small files.
+In theory, the parallelism of `bulk_insert` equals the number of buckets. (In 
particular, when each bucket writes to the maximum file size, it
+rolls over to a new file handle.) Finally, the number of files ≥ 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
 :::
 
-#### Options
+### Options
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
-| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks) |
-| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle 
data according to the input field before writing. Enabling this option will 
reduce the number of small files, but there may be a risk of data skew |
-| `write.bulk_insert.sort_input` | `false`  | `true` | Whether to sort data 
according to the input field before writing. Enabling this option will reduce 
the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
+| Option Name                       | Required | Default  | Remarks            
                                                                                
                                                            |
+|-----------------------------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.operation`                 | `true`   | `upsert` | Set to 
`bulk_insert` to enable this function                                           
                                                                        |
+| `write.tasks`                     | `false`  | `4`      | The parallelism of 
`bulk_insert`; the number of files ≥ 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks)           
                       |
+| `write.bulk_insert.shuffle_input` | `false`  | `true`   | Whether to shuffle 
data by the input field before writing. Enabling this option reduces the number 
of small files but may introduce data‑skew risk             |
+| `write.bulk_insert.sort_input`    | `false`  | `true`   | Whether to sort 
data by the input field before writing. Enabling this option reduces the number 
of small files when a write task writes to multiple partitions |
+| `write.sort.memory`               | `false`  | `128`    | Available managed 
memory for the sort operator; default is 128 MB                                 
                                                             |
 
-### Index Bootstrap
+## Index Bootstrap
 
-For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
+For importing both snapshot data and incremental data: if the snapshot data 
has already been inserted into Hudi via [bulk insert](#bulk-insert),
+users can insert incremental data in real time and ensure the data is not 
duplicated by using the index bootstrap function.
 
 :::note
-If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate 
limit function).
+If you find this process very time‑consuming, you can add resources to write 
in streaming mode while writing snapshot data,
+then reduce the resources when writing incremental data (or enable the 
rate‑limit function).
 :::
 
-#### Options
+### Options
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
-| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
+| Option Name               | Required | Default | Remarks                     
                                                                                
               |
+|---------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------|
+| `index.bootstrap.enabled` | `true`   | `false` | When index bootstrap is 
enabled, the remaining records in the Hudi table are loaded into the Flink 
state at once           |
+| `index.partition.regex`   | `false`  | `*`     | Optimization option. Set a 
regular expression to filter partitions. By default, all partitions are loaded 
into Flink state |
 
-#### How To Use
+### How to use
 
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
+1. Use `CREATE TABLE` to create a statement corresponding to the Hudi table. 
Note that `table.type` must be correct.
+2. Set `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
+3. Set the Flink checkpoint failure tolerance in `flink-conf.yaml`: 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
+4. Wait until the first checkpoint succeeds, indicating that the index 
bootstrap has completed.
+5. After the index bootstrap completes, users can exit and save the savepoint 
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` to `false`.
 
 :::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+
+1. Index bootstrap is blocking, so checkpoints cannot complete during index 
bootstrap.
+2. Index bootstrap is triggered by the input data. Users need to ensure that 
there is at least one record in each partition.
+3. Index bootstrap executes concurrently. Users can search logs for `finish 
loading the index under partition` and `Load record from file` to observe the 
index‑bootstrap progress.
+4. The first successful checkpoint indicates that the index bootstrap has 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+
 :::
 
-### Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
+## Changelog Mode
 
-#### Options
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consume them through stateful computing in Flink to build a near‑real‑time
+data‑warehouse ETL pipeline (incremental computing). Hudi MOR tables store 
messages as rows, which supports the retention of all change logs (integration 
at the format level).
+All changelog records can be consumed with the Flink streaming reader.
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
+### Options
+
+| Option Name         | Required | Default | Remarks                           
                                                                                
                                                                                
      |
+|---------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `changelog.enabled` | `false`  | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
 
 :::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
+Batch (snapshot) reads still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
 :::
 
 :::note
-After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
+After setting `changelog.enable` to `true`, the retention of changelog records 
is best‑effort only: the asynchronous compaction task will merge the changelog 
records into one record, so if the
+stream source does not consume in a timely manner, only the merged record for 
each key can be read after compaction. The solution is to reserve buffer time 
for the reader by adjusting the compaction strategy, such as
+the compaction options `compaction.delta_commits` and 
`compaction.delta_seconds`.
 :::
 
+## Append Mode
+
+For `INSERT` mode write operations, the current workflow is:
+
+- For Merge‑on‑Read tables, auto‑file sizing is enabled by default
+- For Copy‑on‑Write tables, new parquet files are written directly; auto-file 
sizing is not applied

Review Comment:
   The MOR behavior is now changed to be the same with COW: new parquet files 
are written directly; auto-file sizing is not applied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to