This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 8bcb4a3fc624 docs: update flink related docs as of 1.1 (#14320)
8bcb4a3fc624 is described below

commit 8bcb4a3fc6246b8b3e746928dcce7c99ed640ca1
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Nov 23 00:49:18 2025 -0600

    docs: update flink related docs as of 1.1 (#14320)
    
    Update these pages:
    - flink quick start
    - ingestion flink
    - indexes (content related to flink)
    
    Mainly updated append mode, bucket index, and rate limiting section in 
`ingestion_flink.md`. Other changes are language editing and markdown 
formatting.
---
 website/docs/flink-quick-start-guide.md | 119 +++++-----
 website/docs/indexes.md                 |  23 +-
 website/docs/ingestion_flink.md         | 393 +++++++++++++++++++++++---------
 website/docs/sql_dml.md                 |   2 +-
 4 files changed, 357 insertions(+), 180 deletions(-)

diff --git a/website/docs/flink-quick-start-guide.md 
b/website/docs/flink-quick-start-guide.md
index 6c592fa537b2..f29f2ba1f7b3 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -1,36 +1,32 @@
 ---
 title: "Flink Quick Start"
 toc: true
-last_modified_at: 2023-08-16T12:53:57+08:00
+last_modified_at: 2025-11-22T14:30:00+08:00
 ---
 import Tabs from '@theme/Tabs';
 import TabItem from '@theme/TabItem';
 
-This page introduces Flink-Hudi integration. We can feel the unique charm of 
how Flink brings in the power of streaming into Hudi.
+This page introduces Flink–Hudi integration and demonstrates how Flink brings 
the power of streaming to Hudi.
 
 ## Setup
 
-
 ### Flink Support Matrix
 
-
 | Hudi   | Supported Flink version                                             
   |
-|:-------|:-----------------------------------------------------------------------|
+| :----- | 
:--------------------------------------------------------------------- |
+| 1.1.x  | 1.17.x, 1.18.x, 1.19.x, 1.20.x (default build), 2.0.x               
   |
 | 1.0.x  | 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x, 1.20.x (default 
build) |
 | 0.15.x | 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x                              
   |
 | 0.14.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x                              
   |
-| 0.13.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x                                      
   |
-| 0.12.x | 1.13.x, 1.14.x, 1.15.x                                              
   |
-| 0.11.x | 1.13.x, 1.14.x                                                      
   |
-
 
 ### Download Flink and Start Flink cluster
 
-Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink 
1.15, Flink 1.16, Flink 1.17, Flink 1.18, Flink 1.19 and Flink 1.20.
-You can follow the instructions [here](https://flink.apache.org/downloads) for 
setting up Flink. Then, start a standalone Flink cluster 
-within hadoop environment. In case we are trying on local setup, then we could 
download hadoop binaries and set HADOOP_HOME.
+- You can follow the [instructions here](https://flink.apache.org/downloads) 
for setting up Flink
+- Then start a standalone Flink cluster within a Hadoop environment
 
-```bash
+For local setup, you can download Hadoop binaries and set `HADOOP_HOME` as 
follows:
+
+```shell
 # HADOOP_HOME is your hadoop root directory after unpack the binary package.
 export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 
@@ -40,7 +36,7 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
 <div className="notice--info">
   <h4>Please note the following: </h4>
 <ul>
-  <li>We suggest hadoop 2.9.x+ version because some of the object storage has 
filesystem implementation only after that</li>
+  <li>We recommend Hadoop 2.9.x+ because some object storage systems have 
filesystem implementations only from that version onward</li>
   <li>The flink-parquet and flink-avro formats are already packaged into the 
hudi-flink-bundle jar</li>
 </ul>
 </div>
@@ -54,21 +50,21 @@ values={[
 >
 <TabItem value="flinksql">
 
-We use the [Flink Sql 
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
 because it's a good
-quick start tool for SQL users.
+We use the [Flink SQL 
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
 because it's a good
+quick-start tool for SQL users.
 
-### Start Flink SQL client
+### Start Flink SQL Client
 
-Hudi supports packaged bundle jar for Flink, which should be loaded in the 
Flink SQL Client when it starts up.
+Hudi supports a packaged bundle jar for Flink, which should be loaded in the 
Flink SQL Client when it starts up.
 You can build the jar manually under path 
`hudi-source-dir/packaging/hudi-flink-bundle`(see [Build Flink Bundle 
Jar](syncing_metastore#install)), or download it from the
 [Apache Official 
Repository](https://repo.maven.apache.org/maven2/org/apache/hudi/).
 
 Now start the SQL CLI:
 
 ```bash
-# For Flink versions: 1.13 - 1.18
-export FLINK_VERSION=1.17 
-export HUDI_VERSION=1.0.1
+# For Flink versions: 1.17-1.20, 2.0
+export FLINK_VERSION=1.20 
+export HUDI_VERSION=1.1.0
 wget 
https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_VERSION}-bundle/${HUDI_VERSION}/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar
 -P /tmp/
 ./bin/sql-client.sh embedded -j 
/tmp/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell
 ```
@@ -79,14 +75,13 @@ The SQL CLI only executes the SQL line by line.
 
 <TabItem value="dataStream">
 
-Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink 
1.15, Flink 1.16, Flink 1.17, Flink 1.18, Flink 1.19 and Flink 1.20.
 Please add the desired dependency to your project:
 ```xml
-<!-- For Flink versions 1.13 - 1.18-->
+<!-- For Flink versions 1.17-1.20, 2.0-->
 <properties>
-    <flink.version>1.17.0</flink.version>
-    <flink.binary.version>1.17</flink.binary.version>
-    <hudi.version>1.0.1</hudi.version>
+    <flink.version>1.20.0</flink.version>
+    <flink.binary.version>1.20</flink.binary.version>
+    <hudi.version>1.1.0</hudi.version>
 </properties>
 <dependency>
     <groupId>org.apache.hudi</groupId>
@@ -114,7 +109,7 @@ values={[
 
 <TabItem value="flinksql">
 
-Here is an example of creating a flink Hudi table.
+Here is an example of creating a Flink Hudi table.
 
 ```sql
 -- sets up the result mode to tableau to show the results directly in the CLI
@@ -142,7 +137,7 @@ WITH (
 
 ```java
 // Java
-// First commit will auto-initialize the table, if it did not exist in the 
specified base path. 
+// The first commit will auto-initialize the table if it does not exist in the 
specified base path.
 ```
 
 </TabItem>
@@ -298,7 +293,7 @@ values={[
 
 <TabItem value="flinksql">
 
-Hudi tables can be updated by either inserting reocrds with same primary key 
or using a standard UPDATE statement shown as below.
+Hudi tables can be updated by either inserting records with the same record 
key or using a standard UPDATE statement as shown below.
 
 ```sql
 -- Update Queries only works with batch execution mode
@@ -308,15 +303,15 @@ UPDATE hudi_table SET fare = 25.0 WHERE uuid = 
'334e26e9-8355-45cc-97c6-c31daf0d
 
 :::note
 The `UPDATE` statement is supported since Flink 1.17, so only Hudi Flink 
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
+Only **batch** queries on Hudi table with record key work correctly.
 :::
 </TabItem>
 
 <TabItem value="dataStream">
 
-Add some streaming source to flink and load the data in hudi table using 
DataStream API as [above](#insert-data).
-When new rows with the same primary key arrive in stream, then it will be be 
updated.
-In the insert example incoming row with same record id will be updated.
+Add a streaming source to Flink and load the data into the Hudi table using 
the DataStream API as [above](#insert-data).
+When new rows with the same record key arrive in the stream, they will be 
updated.
+In the insert example, incoming rows with the same record ID will be updated.
 
 Refer Update Example 
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
 
@@ -325,7 +320,7 @@ Refer Update Example 
[here](https://github.com/ad1happy2go/hudi-examples/blob/ma
 </Tabs
 >
 
-[Querying](#query-data) the data again will now show updated records. Each 
write operation generates a new [commit](concepts) 
+[Querying](#query-data) the data again will now show updated records. Each 
write operation generates a new [commit](concepts)
 denoted by the timestamp.
 
 
@@ -340,10 +335,10 @@ values={[
 
 <TabItem value="flinksql">
 
-### Row-level Delete
+### Row‑Level Delete
 
-When consuming data in streaming query, Hudi Flink source can also accept the 
change logs from the upstream data source if the `RowKind` is set up per-row,
-it can then apply the UPDATE and DELETE in row level. You can then sync a 
NEAR-REAL-TIME snapshot on Hudi for all kinds
+When consuming data in a streaming query, the Hudi Flink source can also 
accept change logs from the upstream data source if the `RowKind` is set up per 
row;
+it can then apply UPDATE and DELETE at the row level. You can then sync a 
near‑real‑time snapshot on Hudi for all kinds
 of RDBMS.
 
 ### Batch Delete
@@ -357,15 +352,15 @@ DELETE FROM t1 WHERE age > 23;
 
 :::note
 The `DELETE` statement is supported since Flink 1.17, so only Hudi Flink 
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
+Only **batch** queries on Hudi table with record key work correctly.
 :::
 
 </TabItem>
 
 <TabItem value="dataStream">
 
-Creates a Flink Hudi table first and insert data into the Hudi table using 
DataStream API as below.
-When new rows with the same primary key and Row Kind as Delete arrive in 
stream, then it will be be deleted.
+First, create a Flink Hudi table and insert data into the Hudi table using the 
DataStream API as below.
+When new rows with the same record key and Row Kind as DELETE arrive in the 
stream, they will be deleted.
 
 Refer Delete Example 
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
 
@@ -377,7 +372,7 @@ Refer Delete Example 
[here](https://github.com/ad1happy2go/hudi-examples/blob/ma
 
 ## Streaming Query
 
-Hudi Flink also provides capability to obtain a stream of records that changed 
since given commit timestamp. 
+Hudi Flink also provides the capability to obtain a stream of records that 
changed since a given commit timestamp.
 This can be achieved using Hudi's streaming querying and providing a start 
time from which changes need to be streamed. 
 We do not need to specify endTime, if we want all changes after the given 
commit (as is the common case).
 
@@ -396,7 +391,7 @@ WITH (
   'table.type' = 'MERGE_ON_READ',
   'read.streaming.enabled' = 'true',  -- this option enable the streaming read
   'read.start-commit' = '20210316134557', -- specifies the start commit 
instant time
-  'read.streaming.check-interval' = '4' -- specifies the check interval for 
finding new source commits, default 60s.
+  'read.streaming.check-interval' = '4' -- specifies the check interval for 
finding new source commits; default is 60s.
 );
 
 -- Then query the table in stream mode
@@ -405,8 +400,8 @@ select * from t1;
 
 ## Change Data Capture Query
 
-Hudi Flink also provides capability to obtain a stream of records with Change 
Data Capture.
-CDC queries are useful for applications that need to obtain all the changes, 
along with before/after images of records.
+Hudi Flink also provides the capability to obtain a stream of records with 
Change Data Capture.
+CDC queries are useful for applications that need to obtain all changes, along 
with before/after images of records.
 
 ```sql
 set sql-client.execution.result-mode = tableau;
@@ -424,7 +419,7 @@ WITH (
   'connector' = 'hudi',
   'path' = 'file:///tmp/hudi_table',
   'table.type' = 'COPY_ON_WRITE',
-  'cdc.enabled' = 'true' -- this option enable the cdc log enabled
+  'cdc.enabled' = 'true' -- this option enables CDC logging
 );
 -- insert data using values
 INSERT INTO hudi_table
@@ -445,30 +440,32 @@ select * from hudi_table/*+ 
OPTIONS('read.streaming.enabled'='true')*/;
 ``` 
 
 This will give all changes that happened after the `read.start-commit` commit. 
The unique thing about this
-feature is that it now lets you author streaming pipelines on streaming or 
batch data source.
+feature is that it lets you author streaming pipelines on streaming or batch 
data sources.
 
 ## Where To Go From Here?
-- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly 
Flink sql client to write to(read from) Hudi.
-- **Configuration** : For [Global 
Configuration](flink_tuning#global-configurations), sets up through 
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through 
[Table Option](flink_tuning#table-options).
+
+- **Quick Start**: Read the Quick Start section above to get started quickly 
with the Flink SQL client to write to (and read from) Hudi.
+- **Configuration**: For [Global 
Configuration](flink_tuning#global-configurations), set up through 
`$FLINK_HOME/conf/flink-conf.yaml`. For per-job configuration, set up through 
[Table Option](flink_tuning#table-options).
 - **Writing Data** : Flink supports different modes for writing, such as [CDC 
Ingestion](ingestion_flink#cdc-ingestion), [Bulk 
Insert](ingestion_flink#bulk-insert), [Index 
Bootstrap](ingestion_flink#index-bootstrap), [Changelog 
Mode](ingestion_flink#changelog-mode) and [Append 
Mode](ingestion_flink#append-mode). Flink also supports multiple streaming 
writers with [non-blocking concurrency 
control](sql_dml#non-blocking-concurrency-control-experimental).
 - **Reading Data** : Flink supports different modes for reading, such as 
[Streaming Query](sql_queries#streaming-query) and [Incremental 
Query](sql_queries#incremental-query).
-- **Tuning** : For write/read tasks, this guide gives some tuning suggestions, 
such as [Memory Optimization](flink_tuning#memory-optimization) and [Write Rate 
Limit](flink_tuning#write-rate-limit).
-- **Optimization**: Offline compaction is supported [Offline 
Compaction](compaction#flink-offline-compaction).
+- **Tuning**: For write/read tasks, this guide provides some tuning 
suggestions, such as [Memory Optimization](flink_tuning#memory-optimization) 
and [Write Rate Limit](flink_tuning#write-rate-limit).
+- **Optimization**: Offline compaction is supported: [Offline 
Compaction](compaction#flink-offline-compaction).
 - **Query Engines**: Besides Flink, many other engines are integrated: [Hive 
Query](syncing_metastore#flink-setup), [Presto Query](sql_queries#presto).
-- **Catalog**: A Hudi specific catalog is supported: [Hudi 
Catalog](sql_ddl/#create-catalog).
+- **Catalog**: A Hudi‑specific catalog is supported: [Hudi 
Catalog](sql_ddl/#create-catalog).
 
 If you are relatively new to Apache Hudi, it is important to be familiar with 
a few core concepts:
-  - [Hudi Timeline](timeline) – How Hudi manages transactions and other table 
services
-  - [Hudi Storage Layout](storage_layouts) - How the files are laid out on 
storage
-  - [Hudi Table Types](table_types) – `COPY_ON_WRITE` and `MERGE_ON_READ`
-  - [Hudi Query Types](table_types#query-types) – Snapshot Queries, 
Incremental Queries, Read-Optimized Queries
 
-See more in the "Concepts" section of the docs.
+- [Hudi Timeline](timeline) – How Hudi manages transactions and other table 
services
+- [Hudi Storage Layout](storage_layouts) – How files are laid out on storage
+- [Hudi Table Types](table_types) – `COPY_ON_WRITE` and `MERGE_ON_READ`
+- [Hudi Query Types](table_types#query-types) – Snapshot Queries, Incremental 
Queries, Read‑Optimized Queries
+
+See more in the [concepts](concepts) docs page.
 
 Take a look at recent [blog posts](/blog) that go in depth on certain topics 
or use cases.
 
-Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto 
and much more. We have put together a 
-[demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that show cases all 
of this on a docker based setup with all 
-dependent systems running locally. We recommend you replicate the same setup 
and run the demo yourself, by following 
-steps [here](docker_demo) to get a taste for it. Also, if you are looking for 
ways to migrate your existing data 
-to Hudi, refer to [migration guide](migration_guide). 
+Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto, 
and much more. We have put together a
+[demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that showcases all 
of this on a Docker‑based setup with all
+dependent systems running locally. We recommend you replicate the same setup 
and run the demo yourself by following
+the steps in the [docker demo](docker_demo) to get a taste for it. Also, if 
you are looking for ways to migrate your existing data
+to Hudi, refer to the [migration guide](migration_guide).
diff --git a/website/docs/indexes.md b/website/docs/indexes.md
index 43a6bab9c631..fc17a7b26c92 100644
--- a/website/docs/indexes.md
+++ b/website/docs/indexes.md
@@ -124,7 +124,7 @@ files themselves (e.g. bloom filters stored in parquet file 
footers) or intellig
 Currently, Hudi supports the following index types. Default is SIMPLE on Spark 
engine, and INMEMORY on Flink and Java
 engines. Writers can pick one of these options using `hoodie.index.type` 
config option.
 
-- **SIMPLE (default for Spark engines)**: This is the standard index type for 
the Spark engine. It executes an efficient join of incoming records with keys 
retrieved from the table stored on disk. It requires keys to be partition-level 
unique so it can function correctly.
+- **SIMPLE (default for Spark & Java engines)**: This is the standard index 
type for the Spark engine. It executes an efficient join of incoming records 
with keys retrieved from the table stored on disk. It requires keys to be 
partition-level unique so it can function correctly.
 
 - **RECORD_INDEX** : Use the record index from section above as the writer 
side index.
 
@@ -136,12 +136,17 @@ engines. Writers can pick one of these options using 
`hoodie.index.type` config
 
 - **HBASE**: Mangages the index mapping through an external table in Apache 
HBase.
 
-- **INMEMORY (default for Flink and Java)**: Uses in-memory hashmap in Spark 
and Java engine and Flink in-memory state in Flink for indexing.
+- **INMEMORY**: Uses in-memory hashmap in Spark and Java engine and Flink 
in-memory state in Flink for indexing. Note that this is an alias for 
`FLINK_STATE` when used for Flink writers.
 
-- **BUCKET**: Utilizes bucket hashing to identify the file group that houses 
the records, which proves to be particularly advantageous on a large scale. To 
select the type of bucket engine—that is, the method by which buckets are 
created—use the `hoodie.index.bucket.engine` configuration option.
-  - **SIMPLE(default)**: This index employs a fixed number of buckets for file 
groups within each partition, which do not have the capacity to decrease or 
increase in size. It is applicable to both COW and MOR tables. Due to the 
unchangeable number of buckets and the design principle of mapping each bucket 
to a single file group, this indexing method may not be ideal for partitions 
with significant data skew.
+- **FLINK_STATE (default for Flink)**: Uses the Flink state backend to store 
the index data: mappings of record keys to their residing file group's file IDs.
 
-  - **CONSISTENT_HASHING**: This index accommodates a dynamic number of 
buckets, with the capability for bucket resizing to ensure each bucket is sized 
appropriately. This addresses the issue of data skew in partitions with a high 
volume of data by allowing these partitions to be dynamically resized. As a 
result, partitions can have multiple reasonably sized buckets, unlike the fixed 
bucket count per partition seen in the SIMPLE bucket engine type. This feature 
is exclusively compatible  [...]
+- **BUCKET**: Utilizes bucket hashing to identify the file group that houses 
the records, which proves to be particularly advantageous on a large scale. The 
bucket index has three variants based on how buckets are configured and managed:
+
+  - **Simple Bucket Index (default)**: Employs a fixed number of buckets 
across all partitions. The bucket count is immutable once set and cannot 
increase or decrease. Applicable to both COW and MOR tables. Set via 
`hoodie.index.bucket.engine=SIMPLE` and `hoodie.bucket.index.num.buckets`. Due 
to the uniform bucket count across all partitions, this may not be ideal for 
tables with varying partition sizes or data skew.
+
+  - **Partition-Level Bucket Index**: Allows different fixed bucket counts for 
different partitions based on regex pattern matching. Existing simple bucket 
index tables can be upgraded to partition-level using the Spark 
`partition_bucket_index_manager` procedure, which rescales affected partitions. 
After upgrade, writers (Flink/Spark) automatically load partition-specific 
bucket configurations from table metadata. This addresses the limitation of 
uniform bucket counts while maintaining i [...]
+
+  - **Consistent Hashing Bucket Index**: Accommodates a dynamic number of 
buckets with automatic resizing capability via clustering. Starts with an 
initial bucket count and can grow or shrink within configured min/max bounds 
based on file sizes. This addresses data skew in high-volume partitions by 
allowing dynamic resizing. Flink can schedule clustering plans, but execution 
currently requires Spark. Exclusively compatible with MOR tables. Configure via 
`hoodie.index.bucket.engine=CONSIS [...]
 
 - **Bring your own implementation:** You can extend this [public 
API](https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java)
   and supply a subclass of `SparkHoodieIndex` (for Apache Spark writers) using 
`hoodie.index.class` to implement custom indexing.
@@ -183,18 +188,14 @@ for more details. All these, support the index types 
mentioned [above](#index-ty
 
 #### Flink based configs
 
-For Flink DataStream and Flink SQL only support Bucket Index and internal 
Flink state store backed in memory index.
-Following are the basic configs that control the indexing behavior. Please 
refer 
[here](https://hudi.apache.org/docs/next/configurations#Flink-Options-advanced-configs)
-for advanced configs.
+For Flink DataStream and Flink SQL, Bucket index and Flink state index are 
supported.
+Following are the basic configs that control the indexing behavior. Please 
refer [the configurations here](configurations#Flink-Options-advanced-configs) 
for advanced configs.
 
 | Config Name                                                                  
     | Default                                                                  
                       | Description                                            
                                                                                
                                                                                
                                                                |
 | 
----------------------------------------------------------------------------------|
 
-----------------------------------------------------------------------------------------------
 
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | index.type                                                                   
     | FLINK_STATE (Optional)                  | Index type of Flink write job, 
default is using state backed index. Possible values:<br /> 
<ul><li>FLINK_STATE</li><li>BUCKET</li></ul><br />  `Config Param: INDEX_TYPE`  
                                                                                
                            |
 | hoodie.index.bucket.engine                                                   
     | SIMPLE (Optional)                                                        
                       | 
org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of 
bucketing or hashing to use when `hoodie.index.type` is set to `BUCKET`.    
Possible Values: <br /> <ul><li>SIMPLE</li><li>CONSISTENT_HASHING</li></ul>|
 
-
-
-
 ### Picking Indexing Strategies
 
 Since data comes in at different volumes, velocity and has different access 
patterns, different indexes could be used for different workload types.
diff --git a/website/docs/ingestion_flink.md b/website/docs/ingestion_flink.md
index 8176ff1e5ac6..3d62e07f239c 100644
--- a/website/docs/ingestion_flink.md
+++ b/website/docs/ingestion_flink.md
@@ -1,179 +1,358 @@
 ---
 title: Using Flink
 keywords: [hudi, flink, streamer, ingestion]
+last_modified_at: 2025-11-22T12:53:57+08:00
 ---
 
-### CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source 
system so a downstream process or system can action that change.
+## CDC Ingestion
+
+CDC (change data capture) keeps track of data changes evolving in a source 
system so a downstream process or system can act on those changes.
 We recommend two ways for syncing CDC data into Hudi:
 
 ![slide1 title](/assets/images/cdc-2-hudi.png)
 
-1. Using the Ververica 
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors) 
directly connect to DB Server to sync the binlog data into Hudi.
-   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc 
format, the advantage is that it is highly scalable,
+1. Use [Apache Flink-CDC](https://github.com/apache/flink-cdc) to directly 
connect to the database server and sync binlog data into Hudi.
+   The advantage is that it does not rely on message queues, but the 
disadvantage is that it puts pressure on the database server.
+2. Consume data from a message queue (e.g., Kafka) using the Flink CDC format. 
The advantage is that it is highly scalable,
    but the disadvantage is that it relies on message queues.
 
 :::note
-- If the upstream data cannot guarantee the order, you need to specify option 
`write.precombine.field` explicitly;
+If the upstream data cannot guarantee ordering, you need to explicitly specify 
the `ordering.fields` option.
 :::
 
-### Bulk Insert
+## Bulk Insert
 
-For the demand of snapshot data import. If the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
+For snapshot data import requirements, if the snapshot data comes from other 
data sources, use the `bulk_insert` mode to quickly
 import the snapshot data into Hudi.
 
-
 :::note
-`bulk_insert` eliminates the serialization and data merging. The data 
deduplication is skipped, so the user need to guarantee the uniqueness of the 
data.
+`bulk_insert` eliminates serialization and data merging. Data deduplication is 
skipped, so the user needs to guarantee data uniqueness.
 :::
 
 :::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the 
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write 
performance degradation caused by
-frequent `file handle` switching.  
+`bulk_insert` is more efficient in `batch execution mode`. By default, `batch 
execution mode` sorts the input records
+by partition path and writes these records to Hudi, which can avoid 
write‑performance degradation caused by
+frequent file‑handle switching.
 :::
 
-:::note  
-The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In 
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >= 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
+:::note
+The parallelism of `bulk_insert` is specified by `write.tasks`. The 
parallelism affects the number of small files.
+In theory, the parallelism of `bulk_insert` equals the number of buckets. (In 
particular, when each bucket writes to the maximum file size, it
+rolls over to a new file handle.) The final number of files is greater than or 
equal to [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
 :::
 
-#### Options
+### Options
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open 
this function  |
-| `write.tasks`  |  `false`  | `4` | The parallelism of `bulk_insert`, `the 
number of files` >= 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks) |
-| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle 
data according to the input field before writing. Enabling this option will 
reduce the number of small files, but there may be a risk of data skew |
-| `write.bulk_insert.sort_input` | `false`  | `true` | Whether to sort data 
according to the input field before writing. Enabling this option will reduce 
the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort 
operator. default  `128` MB |
+| Option Name                       | Required | Default  | Remarks            
                                                                                
                                                            |
+|-----------------------------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.operation`                 | `true`   | `upsert` | Set to 
`bulk_insert` to enable this function                                           
                                                                        |
+| `write.tasks`                     | `false`  | `4`      | The parallelism of 
`bulk_insert`; the number of files ≥ 
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks)           
                       |
+| `write.bulk_insert.shuffle_input` | `false`  | `true`   | Whether to shuffle 
data by the input field before writing. Enabling this option reduces the number 
of small files but may introduce data‑skew risk             |
+| `write.bulk_insert.sort_input`    | `false`  | `true`   | Whether to sort 
data by the input field before writing. Enabling this option reduces the number 
of small files when a write task writes to multiple partitions |
+| `write.sort.memory`               | `false`  | `128`    | Available managed 
memory for the sort operator; default is 128 MB                                 
                                                             |
 
-### Index Bootstrap
+## Index Bootstrap
 
-For the demand of `snapshot data` + `incremental data` import. If the 
`snapshot data` already insert into Hudi by  [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not 
repeated by using the index bootstrap function.
+For importing both snapshot data and incremental data: if the snapshot data 
has already been inserted into Hudi via [bulk insert](#bulk-insert),
+users can insert incremental data in real time and ensure the data is not 
duplicated by using the index bootstrap function.
 
 :::note
-If you think this process is very time-consuming, you can add resources to 
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate 
limit function).
+If you find this process very time‑consuming, you can add resources to write 
in streaming mode while writing snapshot data,
+then reduce the resources when writing incremental data (or enable the 
rate‑limit function).
 :::
 
-#### Options
+### Options
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is 
enabled, the remain records in Hudi table will be loaded into the Flink state 
at one time |
-| `index.partition.regex`  |  `false`  | `*` | Optimize option. Setting 
regular expressions to filter partitions. By default, all partitions are loaded 
into flink state |
+| Option Name               | Required | Default | Remarks                     
                                                                                
               |
+|---------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------|
+| `index.bootstrap.enabled` | `true`   | `false` | When index bootstrap is 
enabled, the remaining records in the Hudi table are loaded into the Flink 
state at once           |
+| `index.partition.regex`   | `false`  | `*`     | Optimization option. Set a 
regular expression to filter partitions. By default, all partitions are loaded 
into Flink state |
 
-#### How To Use
+### How to use
 
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note 
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` : 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index 
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint 
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
+1. Use `CREATE TABLE` to create a statement corresponding to the Hudi table. 
Note that `table.type` must be correct.
+2. Set `index.bootstrap.enabled` = `true` to enable the index bootstrap 
function.
+3. Set the Flink checkpoint failure tolerance in `flink-conf.yaml`: 
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink 
checkpoint scheduling times).
+4. Wait until the first checkpoint succeeds, indicating that the index 
bootstrap has completed.
+5. After the index bootstrap completes, users can exit and save the savepoint 
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` to `false`.
 
 :::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index 
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there 
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish 
loading the index under partition` and `Load record form file` to observe the 
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+
+1. Index bootstrap is blocking, so checkpoints cannot complete during index 
bootstrap.
+2. Index bootstrap is triggered by the input data. Users need to ensure that 
there is at least one record in each partition.
+3. Index bootstrap executes concurrently. Users can search logs for `finish 
loading the index under partition` and `Load record from file` to observe the 
index‑bootstrap progress.
+4. The first successful checkpoint indicates that the index bootstrap has 
completed. There is no need to load the index again when recovering from the 
checkpoint.
+
 :::
 
-### Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores 
messages in the forms of rows, which supports the retention of all change logs 
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
+## Changelog Mode
+
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then 
consume them through stateful computing in Flink to build a near‑real‑time
+data‑warehouse ETL pipeline (incremental computing). Hudi MOR tables store 
messages as rows, which supports the retention of all change logs (integration 
at the format level).
+All changelog records can be consumed with the Flink streaming reader.
 
-#### Options
+### Options
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
+| Option Name         | Required | Default | Remarks                           
                                                                                
                                                                                
      |
+|---------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `changelog.enabled` | `false`  | `false` | It is turned off by default, to 
have the `upsert` semantics, only the merged messages are ensured to be kept, 
intermediate changes may be merged. Setting to true to support consumption of 
all changes |
 
 :::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
+Batch (snapshot) reads still merge all the intermediate changes, regardless of 
whether the format has stored the intermediate changelog messages.
 :::
 
 :::note
-After setting `changelog.enable` as `true`, the retention of changelog records 
are only best effort: the asynchronous compaction task will merge the changelog 
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can 
be read after compaction. The solution is to reserve some buffer time for the 
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and 
[`compaction.delta_seconds`](#compaction).
+After setting `changelog.enable` to `true`, the retention of changelog records 
is best‑effort only: the asynchronous compaction task will merge the changelog 
records into one record, so if the
+stream source does not consume in a timely manner, only the merged record for 
each key can be read after compaction. The solution is to reserve buffer time 
for the reader by adjusting the compaction strategy, such as
+the compaction options `compaction.delta_commits` and 
`compaction.delta_seconds`.
 :::
 
+## Append Mode
+
+For `INSERT` mode write operations, new Parquet files are written directly, 
and the [auto‑file sizing](file_sizing.md) is not enabled.
+
+### In-Memory Buffer Sort
 
-### Append Mode
+For append-only workloads, Hudi supports in-memory buffer sorting to improve 
Parquet compression ratio. When enabled, data is sorted within the write buffer 
before being flushed to disk. This improves columnar file compression 
efficiency by grouping similar values together.
 
-For `INSERT` mode write operation, the current work flow is:
+| Option Name                 | Required | Default | Remarks                   
                                                                                
                    |
+|-----------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------|
+| `write.buffer.sort.enabled` | `false`  | `false` | Whether to enable buffer 
sort within append write function. Improves Parquet compression ratio by 
sorting data before writing |
+| `write.buffer.sort.keys`    | `false`  | `N/A`   | Sort keys concatenated by 
comma (e.g., `col1,col2`). Required when `write.buffer.sort.enabled` is `true`  
                    |
+| `write.buffer.size`         | `false`  | `1000`  | Buffer size in number of 
records. When buffer reaches this size, data is sorted and flushed to disk      
                     |
 
-- For Merge_On_Read table, the small file strategies are by default applied: 
tries to append to the small avro log files first
-- For Copy_On_Write table, write new parquet files directly, no small file 
strategies are applied
+### Disable Meta Fields
+
+For append-only workloads where Hudi metadata fields (e.g., 
`_hoodie_commit_time`, `_hoodie_record_key`) are not needed, you can disable 
them to reduce storage overhead. This is useful when integrating with external 
systems that don't require Hudi-specific metadata.
+
+| Option Name                   | Required | Default | Remarks                 
                                                                                
                                                       |
+|-------------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `hoodie.populate.meta.fields` | `false`  | `true`  | Whether to populate 
Hudi meta fields. Set to `false` for append-only workloads to reduce storage 
overhead. Note: Some Hudi features may not work when disabled |
 
 Hudi supports rich clustering strategies to optimize the files layout for 
`INSERT` mode:
 
-#### Inline Clustering
+### Inline Clustering
 
 :::note
-Only Copy_On_Write table is supported.
+Only Copy‑on‑Write tables are supported.
 :::
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files 
while ingesting, for COW table, open the option to enable the small file 
merging strategy(no deduplication for keys but the throughput will be affected) 
|
+| Option Name            | Required | Default | Remarks                        
                                                                                
                                                                      |
+|------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.insert.cluster` | `false`  | `false` | Whether to merge small files 
while ingesting. For COW tables, enable this option to use the small‑file 
merging strategy (no deduplication for keys, but throughput will be affected) |
 
-#### Async Clustering
+### Async Clustering
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule 
clustering plan during write process, by default false |
-| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the 
clustering plan, only valid when `clustering.schedule.enabled` is true |
-| `clustering.async.enabled` | `false` | `false` | Whether to execute 
clustering plan asynchronously, by default false |
-| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
-| `clustering.plan.strategy.target.file.max.bytes` | `false` | 
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
-| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file 
that has less size than the threshold (unit MB) are candidates for clustering |
-| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to 
sort by when clustering |
+| Option Name                                      | Required | Default        
  | Remarks                                                                     
                            |
+|--------------------------------------------------|----------|------------------|---------------------------------------------------------------------------------------------------------|
+| `clustering.schedule.enabled`                    | `false`  | `false`        
  | Whether to schedule the clustering plan during the write process; default 
is false                      |
+| `clustering.delta_commits`                       | `false`  | `4`            
  | Delta commits for scheduling the clustering plan; only valid when 
`clustering.schedule.enabled` is true |
+| `clustering.async.enabled`                       | `false`  | `false`        
  | Whether to execute the clustering plan asynchronously; default is false     
                            |
+| `clustering.tasks`                               | `false`  | `4`            
  | Parallelism of the clustering tasks                                         
                            |
+| `clustering.plan.strategy.target.file.max.bytes` | `false`  | 
`1024*1024*1024` | The target file size for the clustering group; default is 1 
GB                                          |
+| `clustering.plan.strategy.small.file.limit`      | `false`  | `600`          
  | Files smaller than the threshold (in MB) are candidates for clustering      
                            |
+| `clustering.plan.strategy.sort.columns`          | `false`  | `N/A`          
  | The columns to sort by when clustering                                      
                            |
 
-#### Clustering Plan Strategy
+### Clustering Plan Strategy
 
 Custom clustering strategy is supported.
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options 
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent 
days; 3) `SELECTED_PARTITIONS`: specific partitions |
-| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` | 
Valid for `RECENT_DAYS` mode |
-| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
-| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid 
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
-| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The 
regex to filter the partitions |
-| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific 
partitions separated by comma `,` |
+| Option Name                                             | Required | Default 
| Remarks                                                                       
                                                                   |
+|---------------------------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| `clustering.plan.partition.filter.mode`                 | `false`  | `NONE`  
| Valid options 1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that 
represent recent days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false`  | `2`     
| Number of partitions to look back; valid for `RECENT_DAYS` mode               
                                                                   |
+| `clustering.plan.strategy.cluster.begin.partition`      | `false`  | `N/A`   
| Valid for `SELECTED_PARTITIONS` mode; specify the partition to begin with 
(inclusive)                                                            |
+| `clustering.plan.strategy.cluster.end.partition`        | `false`  | `N/A`   
| Valid for `SELECTED_PARTITIONS` mode; specify the partition to end with 
(inclusive)                                                              |
+| `clustering.plan.strategy.partition.regex.pattern`      | `false`  | `N/A`   
| The regex to filter the partitions                                            
                                                                   |
+| `clustering.plan.strategy.partition.selected`           | `false`  | `N/A`   
| Specific partitions, separated by commas                                      
                                                                   |
+
+## Using Bucket Index
+
+Hudi Flink writer supports two types of writer indexes:
+
+- Flink state (default)
+- Bucket index (3 variants: simple, partition-level, consistent hashing)
+
+### Comparison
+
+| Feature                 | Bucket Index                                       
                                                                                
                                                                                
                                                         | Flink State Index    
                                                                                
                  |
+|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------|
+| How It Works            | Uses a deterministic hash algorithm to shuffle 
records into buckets                                                            
                                                                                
                                                             | Uses the Flink 
state backend to store index data: mappings of record keys to their residing 
file group's file IDs      |
+| Computing/Storage Cost  | No cost for state‑backend indexing                 
                                                                                
                                                                                
                                                         | Has computing and 
storage cost for maintaining state; can become a bottleneck when working with 
large Hudi tables      |
+| Performance             | Better performance due to no state overhead        
                                                                                
                                                                                
                                                         | Performance depends 
on state backend efficiency                                                     
                   |
+| File Group Flexibility  | **Simple**: Fixed number of buckets (file groups) 
per partition, immutable once set<br/>**Partition-Level**: Different fixed 
buckets per partition via regex patterns (rescaling requires Spark 
procedure)<br/>**Consistent Hashing**: Auto-resizing buckets via clustering | 
Dynamically assigns records to file groups based on current table layout; no 
pre-configured limits on file group count |
+| Cross‑Partition Changes | Cannot handle changes among partitions (unless 
input is a CDC stream)                                                          
                                                                                
                                                             | No limit on 
handling cross‑partition changes                                                
                           |
+
+:::note
+Bucket index supports only the `UPSERT` write operation and cannot be used 
with the [append mode](#append-mode) in Flink.
+:::
+
+### Bucket Index Examples
+
+#### Simple Bucket Index
+
+Fixed number of buckets across all partitions:
+
+```sql
+CREATE TABLE orders_simple_bucket (
+  order_id BIGINT,
+  customer_id BIGINT,
+  amount DOUBLE,
+  order_date STRING,
+  ts BIGINT,
+  PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/orders_simple',
+  'table.type' = 'MERGE_ON_READ',
+  
+  -- Bucket Index Configuration
+  'index.type' = 'BUCKET',
+  'hoodie.bucket.index.engine' = 'SIMPLE',
+  'hoodie.bucket.index.hash.field' = 'order_id',
+  'hoodie.bucket.index.num.buckets' = '16'  -- Fixed 16 buckets for ALL 
partitions
+);
+
+-- Insert data
+INSERT INTO orders_simple_bucket VALUES
+  (1, 100, 99.99, '2024-01-15', 1000),
+  (2, 101, 49.99, '2024-02-20', 2000);
+```
+
+#### Partition-Level Bucket Index
+
+Different bucket counts for different partitions based on regex patterns:
+
+```sql
+CREATE TABLE orders_partition_bucket (
+  order_id BIGINT,
+  customer_id BIGINT,
+  amount DOUBLE,
+  order_date STRING,
+  ts BIGINT,
+  PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/orders_partition',
+  'table.type' = 'MERGE_ON_READ',
+  
+  -- Bucket Index Configuration
+  'index.type' = 'BUCKET',
+  'hoodie.bucket.index.engine' = 'SIMPLE',
+  'hoodie.bucket.index.hash.field' = 'order_id',
+  
+  -- Partition-Level Configuration
+  'hoodie.bucket.index.num.buckets' = '8',  -- Default for non-matching 
partitions
+  'hoodie.bucket.index.partition.rule.type' = 'regex',
+  -- Black Friday (11-24), Cyber Monday (11-27), Christmas (12-25) get 128 
buckets
+  -- All other dates get 8 buckets (default)
+  'hoodie.bucket.index.partition.expressions' = '\\d{4}-(11-(24|27)|12-25),128'
+);
+
+-- Insert data - bucket count varies by partition
+INSERT INTO orders_partition_bucket VALUES
+  (1, 100, 999.99, '2024-11-24', 1000),  -- Black Friday: 128 buckets
+  (2, 101, 499.99, '2024-11-27', 2000),  -- Cyber Monday: 128 buckets
+  (3, 102, 299.99, '2024-12-25', 3000),  -- Christmas: 128 buckets
+  (4, 103, 49.99, '2024-01-15', 4000);   -- Regular day: 8 buckets
+```
+
+:::note
+For existing simple bucket index tables, use the Spark 
`partition_bucket_index_manager` procedure to upgrade to partition-level bucket 
index. After upgrade, Flink writers automatically load the expressions from 
table metadata.
+:::
+
+#### Consistent Hashing Bucket Index
+
+Auto-expanding buckets via clustering (requires Spark for execution):
+
+```sql
+CREATE TABLE orders_consistent_hashing (
+  order_id BIGINT,
+  customer_id BIGINT,
+  amount DOUBLE,
+  order_date STRING,
+  ts BIGINT,
+  PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/orders_consistent',
+  'table.type' = 'MERGE_ON_READ',
+  
+  -- Consistent Hashing Bucket Index
+  'index.type' = 'BUCKET',
+  'hoodie.bucket.index.engine' = 'CONSISTENT_HASHING',
+  'hoodie.bucket.index.hash.field' = 'order_id',
+  
+  -- Initial and boundary configuration
+  'hoodie.bucket.index.num.buckets' = '4',      -- Initial bucket count
+  'hoodie.bucket.index.min.num.buckets' = '2',  -- Minimum allowed
+  'hoodie.bucket.index.max.num.buckets' = '128', -- Maximum allowed
+  
+  -- Clustering configuration (required for auto-resizing)
+  'clustering.schedule.enabled' = 'true',
+  'clustering.delta_commits' = '5',  -- Schedule clustering every 5 commits
+  'clustering.plan.strategy.class' = 
'org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy',
+  
+  -- File size thresholds for bucket resizing
+  'hoodie.clustering.plan.strategy.target.file.max.bytes' = '1073741824',  -- 
1GB max
+  'hoodie.clustering.plan.strategy.small.file.limit' = '314572800'  -- 300MB 
min
+);
+
+-- Insert data - buckets auto-adjust based on file sizes
+INSERT INTO orders_consistent_hashing 
+SELECT * FROM source_stream;
+```
+
+:::note
+**Consistent hashing bucket index** automatically adjusts bucket counts via 
clustering. Flink can schedule clustering plans, but execution currently 
requires Spark. Start with `hoodie.bucket.index.num.buckets` and the system 
dynamically resizes based on file sizes within the min/max bounds.
+:::
+
+### Configuration Reference
+
+| Option                                      | Applies To         | Default   
    | Description                                                               
|
+|---------------------------------------------|--------------------|---------------|---------------------------------------------------------------------------|
+| `index.type`                                | All                | 
`FLINK_STATE` | Set to `BUCKET` to enable bucket index                          
          |
+| `hoodie.bucket.index.engine`                | All                | `SIMPLE`  
    | Engine type: `SIMPLE` or `CONSISTENT_HASHING`                             
|
+| `hoodie.bucket.index.hash.field`            | All                | Record 
key    | Fields to hash for bucketing; can be a subset of record key            
   |
+| `hoodie.bucket.index.num.buckets`           | All                | `4`       
    | Default bucket count per partition (initial count for consistent hashing) 
|
+| `hoodie.bucket.index.partition.expressions` | Partition-Level    | N/A       
    | Regex patterns and bucket counts: `pattern1,count1;pattern2,count2`       
|
+| `hoodie.bucket.index.partition.rule.type`   | Partition-Level    | `regex`   
    | Rule parser type                                                          
|
+| `hoodie.bucket.index.min.num.buckets`       | Consistent Hashing | N/A       
    | Minimum bucket count (prevents over-merging)                              
|
+| `hoodie.bucket.index.max.num.buckets`       | Consistent Hashing | N/A       
    | Maximum bucket count (prevents unlimited expansion)                       
|
+| `clustering.schedule.enabled`               | Consistent Hashing | `false`   
    | Must be `true` for auto-resizing                                          
|
+| `clustering.plan.strategy.class`            | Consistent Hashing | N/A       
    | Set to `FlinkConsistentBucketClusteringPlanStrategy`                      
|
+
+## Rate Limiting
+
+Hudi provides rate limiting capabilities for both writes and streaming reads 
to control data flow and prevent performance degradation.
+
+### Write Rate Limiting
+
+In many scenarios, users publish both historical snapshot data and real‑time 
incremental updates to the same message queue, then consume from the earliest 
offset using Flink to ingest everything into Hudi. This backfill pattern can 
cause performance issues:
 
-### Bucket Index
+- **High burst throughput**: The entire historical dataset arrives at once, 
overwhelming the writer with a massive volume of records
+- **Scattered writes across table partitions**: Historical records arrive 
scattered across many different table partitions (e.g., records from many 
different dates if the table is partitioned by date). This forces the writer to 
constantly switch between partitions, keeping many file handles open 
simultaneously and causing memory pressure, which degrades write performance 
and causes throughput instability
 
-By default, flink uses the state-backend to keep the file index: the mapping 
from primary key to fileId. When the input data set is large,
-there is possibility the cost of the state be a bottleneck, the bucket index 
use deterministic hash algorithm for shuffling the records into
-buckets, thus can avoid the storage and query overhead of indexes.
+The `write.rate.limit` option helps smooth out the ingestion flow, preventing 
traffic jitter and improving stability during backfill operations.
 
-#### Options
+### Streaming Read Rate Limiting
 
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket 
index |
-| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset 
of the primary key |
-| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets 
per-partition, it is immutable once set up |
+For Flink streaming reads, rate limiting helps avoid backpressure when 
processing large workloads. The `read.splits.limit` option controls the maximum 
number of input splits allowed to be read in each check interval. This feature 
is particularly useful when:
 
-Comparing to state index:
+- Reading from tables with a large backlog of commits
+- Preventing downstream operators from being overwhelmed
+- Controlling resource consumption during catch-up scenarios
 
-- Bucket index has no computing and storage cost of state-backend index, thus 
has better performance
-- Bucket index can not expand the buckets dynamically, the state-backend index 
can expand the buckets dynamically based on current file layout
-- Bucket index can not handle changes among partitions(no limit if the input 
itself is CDC stream), state-backend index has no limit
+The average read rate can be calculated as: **`read.splits.limit` / 
`read.streaming.check-interval`** splits per second.
 
-### Rate Limit
-There are many use cases that user put the full history data set onto the 
message queue together with the realtime incremental data. Then they consume 
the data from the queue into the hudi from the earliest offset using flink. 
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random 
writing partitions). It will lead to degradation of writing performance and 
throughput glitches. For this case, the speed limit parameter can be turned on 
to ensure smooth writing of the flow.
+### Options
 
-#### Options
-|  Option Name  | Required | Default | Remarks |
-|  -----------  | -------  | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+| Option Name                     | Required | Default             | Remarks   
                                                                                
                                                                                
       |
+|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit`              | `false`  | `0`                 | Write 
record rate limit per second to prevent traffic jitter and improve stability. 
Default is 0 (no limit)                                                         
             |
+| `read.splits.limit`             | `false`  | `Integer.MAX_VALUE` | Maximum 
number of splits allowed to read in each instant check for streaming reads. 
Average read rate = `read.splits.limit`/`read.streaming.check-interval`. 
Default is no limit |
+| `read.streaming.check-interval` | `false`  | `60`                | Check 
interval in seconds for streaming reads. Default is 60 seconds (1 minute)       
                                                                                
           |
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 16db53dcd0ef..e651c1a47a9d 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -487,7 +487,7 @@ This is done to ensure that the compaction and cleaning 
services are not execute
 
 ### Consistent hashing index (Experimental)
 
-We have introduced the Consistent Hashing Index since [0.13.0 
release](/releases/release-0.13.0#consistent-hashing-index). In comparison to 
the static hashing index ([Bucket 
Index](/releases/release-0.11.0#bucket-index)), the consistent hashing index 
offers dynamic scalability of data buckets for the writer. 
+We have introduced the Consistent Hashing Bucket Index since [0.13.0 
release](/releases/release-0.13.0#consistent-hashing-index). This is one of 
three [bucket index](indexes#additional-writer-side-indexes) variants available 
in Hudi. The consistent hashing bucket index offers dynamic scalability of data 
buckets for the writer. 
 You can find the 
[RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md) for the 
design of this feature.
 In the 0.13.X release, the Consistent Hashing Index is supported only for 
Spark engine. And since [release 
0.14.0](/releases/release-0.14.0#consistent-hashing-index-support), the index 
is supported for Flink engine.
 

Reply via email to