This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 8bcb4a3fc624 docs: update flink related docs as of 1.1 (#14320)
8bcb4a3fc624 is described below
commit 8bcb4a3fc6246b8b3e746928dcce7c99ed640ca1
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Nov 23 00:49:18 2025 -0600
docs: update flink related docs as of 1.1 (#14320)
Update these pages:
- flink quick start
- ingestion flink
- indexes (content related to flink)
Mainly updated append mode, bucket index, and rate limiting section in
`ingestion_flink.md`. Other changes are language editing and markdown
formatting.
---
website/docs/flink-quick-start-guide.md | 119 +++++-----
website/docs/indexes.md | 23 +-
website/docs/ingestion_flink.md | 393 +++++++++++++++++++++++---------
website/docs/sql_dml.md | 2 +-
4 files changed, 357 insertions(+), 180 deletions(-)
diff --git a/website/docs/flink-quick-start-guide.md
b/website/docs/flink-quick-start-guide.md
index 6c592fa537b2..f29f2ba1f7b3 100644
--- a/website/docs/flink-quick-start-guide.md
+++ b/website/docs/flink-quick-start-guide.md
@@ -1,36 +1,32 @@
---
title: "Flink Quick Start"
toc: true
-last_modified_at: 2023-08-16T12:53:57+08:00
+last_modified_at: 2025-11-22T14:30:00+08:00
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
-This page introduces Flink-Hudi integration. We can feel the unique charm of
how Flink brings in the power of streaming into Hudi.
+This page introduces Flink–Hudi integration and demonstrates how Flink brings
the power of streaming to Hudi.
## Setup
-
### Flink Support Matrix
-
| Hudi | Supported Flink version
|
-|:-------|:-----------------------------------------------------------------------|
+| :----- |
:--------------------------------------------------------------------- |
+| 1.1.x | 1.17.x, 1.18.x, 1.19.x, 1.20.x (default build), 2.0.x
|
| 1.0.x | 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x, 1.20.x (default
build) |
| 0.15.x | 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x
|
| 0.14.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x
|
-| 0.13.x | 1.13.x, 1.14.x, 1.15.x, 1.16.x
|
-| 0.12.x | 1.13.x, 1.14.x, 1.15.x
|
-| 0.11.x | 1.13.x, 1.14.x
|
-
### Download Flink and Start Flink cluster
-Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink
1.15, Flink 1.16, Flink 1.17, Flink 1.18, Flink 1.19 and Flink 1.20.
-You can follow the instructions [here](https://flink.apache.org/downloads) for
setting up Flink. Then, start a standalone Flink cluster
-within hadoop environment. In case we are trying on local setup, then we could
download hadoop binaries and set HADOOP_HOME.
+- You can follow the [instructions here](https://flink.apache.org/downloads)
for setting up Flink
+- Then start a standalone Flink cluster within a Hadoop environment
-```bash
+For local setup, you can download Hadoop binaries and set `HADOOP_HOME` as
follows:
+
+```shell
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
@@ -40,7 +36,7 @@ export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
<div className="notice--info">
<h4>Please note the following: </h4>
<ul>
- <li>We suggest hadoop 2.9.x+ version because some of the object storage has
filesystem implementation only after that</li>
+ <li>We recommend Hadoop 2.9.x+ because some object storage systems have
filesystem implementations only from that version onward</li>
<li>The flink-parquet and flink-avro formats are already packaged into the
hudi-flink-bundle jar</li>
</ul>
</div>
@@ -54,21 +50,21 @@ values={[
>
<TabItem value="flinksql">
-We use the [Flink Sql
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
because it's a good
-quick start tool for SQL users.
+We use the [Flink SQL
Client](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/)
because it's a good
+quick-start tool for SQL users.
-### Start Flink SQL client
+### Start Flink SQL Client
-Hudi supports packaged bundle jar for Flink, which should be loaded in the
Flink SQL Client when it starts up.
+Hudi supports a packaged bundle jar for Flink, which should be loaded in the
Flink SQL Client when it starts up.
You can build the jar manually under path
`hudi-source-dir/packaging/hudi-flink-bundle`(see [Build Flink Bundle
Jar](syncing_metastore#install)), or download it from the
[Apache Official
Repository](https://repo.maven.apache.org/maven2/org/apache/hudi/).
Now start the SQL CLI:
```bash
-# For Flink versions: 1.13 - 1.18
-export FLINK_VERSION=1.17
-export HUDI_VERSION=1.0.1
+# For Flink versions: 1.17-1.20, 2.0
+export FLINK_VERSION=1.20
+export HUDI_VERSION=1.1.0
wget
https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_VERSION}-bundle/${HUDI_VERSION}/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar
-P /tmp/
./bin/sql-client.sh embedded -j
/tmp/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell
```
@@ -79,14 +75,13 @@ The SQL CLI only executes the SQL line by line.
<TabItem value="dataStream">
-Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink
1.15, Flink 1.16, Flink 1.17, Flink 1.18, Flink 1.19 and Flink 1.20.
Please add the desired dependency to your project:
```xml
-<!-- For Flink versions 1.13 - 1.18-->
+<!-- For Flink versions 1.17-1.20, 2.0-->
<properties>
- <flink.version>1.17.0</flink.version>
- <flink.binary.version>1.17</flink.binary.version>
- <hudi.version>1.0.1</hudi.version>
+ <flink.version>1.20.0</flink.version>
+ <flink.binary.version>1.20</flink.binary.version>
+ <hudi.version>1.1.0</hudi.version>
</properties>
<dependency>
<groupId>org.apache.hudi</groupId>
@@ -114,7 +109,7 @@ values={[
<TabItem value="flinksql">
-Here is an example of creating a flink Hudi table.
+Here is an example of creating a Flink Hudi table.
```sql
-- sets up the result mode to tableau to show the results directly in the CLI
@@ -142,7 +137,7 @@ WITH (
```java
// Java
-// First commit will auto-initialize the table, if it did not exist in the
specified base path.
+// The first commit will auto-initialize the table if it does not exist in the
specified base path.
```
</TabItem>
@@ -298,7 +293,7 @@ values={[
<TabItem value="flinksql">
-Hudi tables can be updated by either inserting reocrds with same primary key
or using a standard UPDATE statement shown as below.
+Hudi tables can be updated by either inserting records with the same record
key or using a standard UPDATE statement as shown below.
```sql
-- Update Queries only works with batch execution mode
@@ -308,15 +303,15 @@ UPDATE hudi_table SET fare = 25.0 WHERE uuid =
'334e26e9-8355-45cc-97c6-c31daf0d
:::note
The `UPDATE` statement is supported since Flink 1.17, so only Hudi Flink
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
+Only **batch** queries on Hudi table with record key work correctly.
:::
</TabItem>
<TabItem value="dataStream">
-Add some streaming source to flink and load the data in hudi table using
DataStream API as [above](#insert-data).
-When new rows with the same primary key arrive in stream, then it will be be
updated.
-In the insert example incoming row with same record id will be updated.
+Add a streaming source to Flink and load the data into the Hudi table using
the DataStream API as [above](#insert-data).
+When new rows with the same record key arrive in the stream, they will be
updated.
+In the insert example, incoming rows with the same record ID will be updated.
Refer Update Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
@@ -325,7 +320,7 @@ Refer Update Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/ma
</Tabs
>
-[Querying](#query-data) the data again will now show updated records. Each
write operation generates a new [commit](concepts)
+[Querying](#query-data) the data again will now show updated records. Each
write operation generates a new [commit](concepts)
denoted by the timestamp.
@@ -340,10 +335,10 @@ values={[
<TabItem value="flinksql">
-### Row-level Delete
+### Row‑Level Delete
-When consuming data in streaming query, Hudi Flink source can also accept the
change logs from the upstream data source if the `RowKind` is set up per-row,
-it can then apply the UPDATE and DELETE in row level. You can then sync a
NEAR-REAL-TIME snapshot on Hudi for all kinds
+When consuming data in a streaming query, the Hudi Flink source can also
accept change logs from the upstream data source if the `RowKind` is set up per
row;
+it can then apply UPDATE and DELETE at the row level. You can then sync a
near‑real‑time snapshot on Hudi for all kinds
of RDBMS.
### Batch Delete
@@ -357,15 +352,15 @@ DELETE FROM t1 WHERE age > 23;
:::note
The `DELETE` statement is supported since Flink 1.17, so only Hudi Flink
bundle compiled with Flink 1.17+ supplies this functionality.
-Only **batch** queries on Hudi table with primary key work correctly.
+Only **batch** queries on Hudi table with record key work correctly.
:::
</TabItem>
<TabItem value="dataStream">
-Creates a Flink Hudi table first and insert data into the Hudi table using
DataStream API as below.
-When new rows with the same primary key and Row Kind as Delete arrive in
stream, then it will be be deleted.
+First, create a Flink Hudi table and insert data into the Hudi table using the
DataStream API as below.
+When new rows with the same record key and Row Kind as DELETE arrive in the
stream, they will be deleted.
Refer Delete Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/main/flink/src/main/java/com/hudi/flink/quickstart/HudiDataStreamWriter.java)
@@ -377,7 +372,7 @@ Refer Delete Example
[here](https://github.com/ad1happy2go/hudi-examples/blob/ma
## Streaming Query
-Hudi Flink also provides capability to obtain a stream of records that changed
since given commit timestamp.
+Hudi Flink also provides the capability to obtain a stream of records that
changed since a given commit timestamp.
This can be achieved using Hudi's streaming querying and providing a start
time from which changes need to be streamed.
We do not need to specify endTime, if we want all changes after the given
commit (as is the common case).
@@ -396,7 +391,7 @@ WITH (
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit
instant time
- 'read.streaming.check-interval' = '4' -- specifies the check interval for
finding new source commits, default 60s.
+ 'read.streaming.check-interval' = '4' -- specifies the check interval for
finding new source commits; default is 60s.
);
-- Then query the table in stream mode
@@ -405,8 +400,8 @@ select * from t1;
## Change Data Capture Query
-Hudi Flink also provides capability to obtain a stream of records with Change
Data Capture.
-CDC queries are useful for applications that need to obtain all the changes,
along with before/after images of records.
+Hudi Flink also provides the capability to obtain a stream of records with
Change Data Capture.
+CDC queries are useful for applications that need to obtain all changes, along
with before/after images of records.
```sql
set sql-client.execution.result-mode = tableau;
@@ -424,7 +419,7 @@ WITH (
'connector' = 'hudi',
'path' = 'file:///tmp/hudi_table',
'table.type' = 'COPY_ON_WRITE',
- 'cdc.enabled' = 'true' -- this option enable the cdc log enabled
+ 'cdc.enabled' = 'true' -- this option enables CDC logging
);
-- insert data using values
INSERT INTO hudi_table
@@ -445,30 +440,32 @@ select * from hudi_table/*+
OPTIONS('read.streaming.enabled'='true')*/;
```
This will give all changes that happened after the `read.start-commit` commit.
The unique thing about this
-feature is that it now lets you author streaming pipelines on streaming or
batch data source.
+feature is that it lets you author streaming pipelines on streaming or batch
data sources.
## Where To Go From Here?
-- **Quick Start** : Read [Quick Start](#quick-start) to get started quickly
Flink sql client to write to(read from) Hudi.
-- **Configuration** : For [Global
Configuration](flink_tuning#global-configurations), sets up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per job configuration, sets up through
[Table Option](flink_tuning#table-options).
+
+- **Quick Start**: Read the Quick Start section above to get started quickly
with the Flink SQL client to write to (and read from) Hudi.
+- **Configuration**: For [Global
Configuration](flink_tuning#global-configurations), set up through
`$FLINK_HOME/conf/flink-conf.yaml`. For per-job configuration, set up through
[Table Option](flink_tuning#table-options).
- **Writing Data** : Flink supports different modes for writing, such as [CDC
Ingestion](ingestion_flink#cdc-ingestion), [Bulk
Insert](ingestion_flink#bulk-insert), [Index
Bootstrap](ingestion_flink#index-bootstrap), [Changelog
Mode](ingestion_flink#changelog-mode) and [Append
Mode](ingestion_flink#append-mode). Flink also supports multiple streaming
writers with [non-blocking concurrency
control](sql_dml#non-blocking-concurrency-control-experimental).
- **Reading Data** : Flink supports different modes for reading, such as
[Streaming Query](sql_queries#streaming-query) and [Incremental
Query](sql_queries#incremental-query).
-- **Tuning** : For write/read tasks, this guide gives some tuning suggestions,
such as [Memory Optimization](flink_tuning#memory-optimization) and [Write Rate
Limit](flink_tuning#write-rate-limit).
-- **Optimization**: Offline compaction is supported [Offline
Compaction](compaction#flink-offline-compaction).
+- **Tuning**: For write/read tasks, this guide provides some tuning
suggestions, such as [Memory Optimization](flink_tuning#memory-optimization)
and [Write Rate Limit](flink_tuning#write-rate-limit).
+- **Optimization**: Offline compaction is supported: [Offline
Compaction](compaction#flink-offline-compaction).
- **Query Engines**: Besides Flink, many other engines are integrated: [Hive
Query](syncing_metastore#flink-setup), [Presto Query](sql_queries#presto).
-- **Catalog**: A Hudi specific catalog is supported: [Hudi
Catalog](sql_ddl/#create-catalog).
+- **Catalog**: A Hudi‑specific catalog is supported: [Hudi
Catalog](sql_ddl/#create-catalog).
If you are relatively new to Apache Hudi, it is important to be familiar with
a few core concepts:
- - [Hudi Timeline](timeline) – How Hudi manages transactions and other table
services
- - [Hudi Storage Layout](storage_layouts) - How the files are laid out on
storage
- - [Hudi Table Types](table_types) – `COPY_ON_WRITE` and `MERGE_ON_READ`
- - [Hudi Query Types](table_types#query-types) – Snapshot Queries,
Incremental Queries, Read-Optimized Queries
-See more in the "Concepts" section of the docs.
+- [Hudi Timeline](timeline) – How Hudi manages transactions and other table
services
+- [Hudi Storage Layout](storage_layouts) – How files are laid out on storage
+- [Hudi Table Types](table_types) – `COPY_ON_WRITE` and `MERGE_ON_READ`
+- [Hudi Query Types](table_types#query-types) – Snapshot Queries, Incremental
Queries, Read‑Optimized Queries
+
+See more in the [concepts](concepts) docs page.
Take a look at recent [blog posts](/blog) that go in depth on certain topics
or use cases.
-Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto
and much more. We have put together a
-[demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that show cases all
of this on a docker based setup with all
-dependent systems running locally. We recommend you replicate the same setup
and run the demo yourself, by following
-steps [here](docker_demo) to get a taste for it. Also, if you are looking for
ways to migrate your existing data
-to Hudi, refer to [migration guide](migration_guide).
+Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto,
and much more. We have put together a
+[demo video](https://www.youtube.com/watch?v=VhNgUsxdrD0) that showcases all
of this on a Docker‑based setup with all
+dependent systems running locally. We recommend you replicate the same setup
and run the demo yourself by following
+the steps in the [docker demo](docker_demo) to get a taste for it. Also, if
you are looking for ways to migrate your existing data
+to Hudi, refer to the [migration guide](migration_guide).
diff --git a/website/docs/indexes.md b/website/docs/indexes.md
index 43a6bab9c631..fc17a7b26c92 100644
--- a/website/docs/indexes.md
+++ b/website/docs/indexes.md
@@ -124,7 +124,7 @@ files themselves (e.g. bloom filters stored in parquet file
footers) or intellig
Currently, Hudi supports the following index types. Default is SIMPLE on Spark
engine, and INMEMORY on Flink and Java
engines. Writers can pick one of these options using `hoodie.index.type`
config option.
-- **SIMPLE (default for Spark engines)**: This is the standard index type for
the Spark engine. It executes an efficient join of incoming records with keys
retrieved from the table stored on disk. It requires keys to be partition-level
unique so it can function correctly.
+- **SIMPLE (default for Spark & Java engines)**: This is the standard index
type for the Spark engine. It executes an efficient join of incoming records
with keys retrieved from the table stored on disk. It requires keys to be
partition-level unique so it can function correctly.
- **RECORD_INDEX** : Use the record index from section above as the writer
side index.
@@ -136,12 +136,17 @@ engines. Writers can pick one of these options using
`hoodie.index.type` config
- **HBASE**: Mangages the index mapping through an external table in Apache
HBase.
-- **INMEMORY (default for Flink and Java)**: Uses in-memory hashmap in Spark
and Java engine and Flink in-memory state in Flink for indexing.
+- **INMEMORY**: Uses in-memory hashmap in Spark and Java engine and Flink
in-memory state in Flink for indexing. Note that this is an alias for
`FLINK_STATE` when used for Flink writers.
-- **BUCKET**: Utilizes bucket hashing to identify the file group that houses
the records, which proves to be particularly advantageous on a large scale. To
select the type of bucket engine—that is, the method by which buckets are
created—use the `hoodie.index.bucket.engine` configuration option.
- - **SIMPLE(default)**: This index employs a fixed number of buckets for file
groups within each partition, which do not have the capacity to decrease or
increase in size. It is applicable to both COW and MOR tables. Due to the
unchangeable number of buckets and the design principle of mapping each bucket
to a single file group, this indexing method may not be ideal for partitions
with significant data skew.
+- **FLINK_STATE (default for Flink)**: Uses the Flink state backend to store
the index data: mappings of record keys to their residing file group's file IDs.
- - **CONSISTENT_HASHING**: This index accommodates a dynamic number of
buckets, with the capability for bucket resizing to ensure each bucket is sized
appropriately. This addresses the issue of data skew in partitions with a high
volume of data by allowing these partitions to be dynamically resized. As a
result, partitions can have multiple reasonably sized buckets, unlike the fixed
bucket count per partition seen in the SIMPLE bucket engine type. This feature
is exclusively compatible [...]
+- **BUCKET**: Utilizes bucket hashing to identify the file group that houses
the records, which proves to be particularly advantageous on a large scale. The
bucket index has three variants based on how buckets are configured and managed:
+
+ - **Simple Bucket Index (default)**: Employs a fixed number of buckets
across all partitions. The bucket count is immutable once set and cannot
increase or decrease. Applicable to both COW and MOR tables. Set via
`hoodie.index.bucket.engine=SIMPLE` and `hoodie.bucket.index.num.buckets`. Due
to the uniform bucket count across all partitions, this may not be ideal for
tables with varying partition sizes or data skew.
+
+ - **Partition-Level Bucket Index**: Allows different fixed bucket counts for
different partitions based on regex pattern matching. Existing simple bucket
index tables can be upgraded to partition-level using the Spark
`partition_bucket_index_manager` procedure, which rescales affected partitions.
After upgrade, writers (Flink/Spark) automatically load partition-specific
bucket configurations from table metadata. This addresses the limitation of
uniform bucket counts while maintaining i [...]
+
+ - **Consistent Hashing Bucket Index**: Accommodates a dynamic number of
buckets with automatic resizing capability via clustering. Starts with an
initial bucket count and can grow or shrink within configured min/max bounds
based on file sizes. This addresses data skew in high-volume partitions by
allowing dynamic resizing. Flink can schedule clustering plans, but execution
currently requires Spark. Exclusively compatible with MOR tables. Configure via
`hoodie.index.bucket.engine=CONSIS [...]
- **Bring your own implementation:** You can extend this [public
API](https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java)
and supply a subclass of `SparkHoodieIndex` (for Apache Spark writers) using
`hoodie.index.class` to implement custom indexing.
@@ -183,18 +188,14 @@ for more details. All these, support the index types
mentioned [above](#index-ty
#### Flink based configs
-For Flink DataStream and Flink SQL only support Bucket Index and internal
Flink state store backed in memory index.
-Following are the basic configs that control the indexing behavior. Please
refer
[here](https://hudi.apache.org/docs/next/configurations#Flink-Options-advanced-configs)
-for advanced configs.
+For Flink DataStream and Flink SQL, Bucket index and Flink state index are
supported.
+Following are the basic configs that control the indexing behavior. Please
refer [the configurations here](configurations#Flink-Options-advanced-configs)
for advanced configs.
| Config Name
| Default
| Description
|
|
----------------------------------------------------------------------------------|
-----------------------------------------------------------------------------------------------
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| index.type
| FLINK_STATE (Optional) | Index type of Flink write job,
default is using state backed index. Possible values:<br />
<ul><li>FLINK_STATE</li><li>BUCKET</li></ul><br /> `Config Param: INDEX_TYPE`
|
| hoodie.index.bucket.engine
| SIMPLE (Optional)
|
org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of
bucketing or hashing to use when `hoodie.index.type` is set to `BUCKET`.
Possible Values: <br /> <ul><li>SIMPLE</li><li>CONSISTENT_HASHING</li></ul>|
-
-
-
### Picking Indexing Strategies
Since data comes in at different volumes, velocity and has different access
patterns, different indexes could be used for different workload types.
diff --git a/website/docs/ingestion_flink.md b/website/docs/ingestion_flink.md
index 8176ff1e5ac6..3d62e07f239c 100644
--- a/website/docs/ingestion_flink.md
+++ b/website/docs/ingestion_flink.md
@@ -1,179 +1,358 @@
---
title: Using Flink
keywords: [hudi, flink, streamer, ingestion]
+last_modified_at: 2025-11-22T12:53:57+08:00
---
-### CDC Ingestion
-CDC(change data capture) keep track of the data changes evolving in a source
system so a downstream process or system can action that change.
+## CDC Ingestion
+
+CDC (change data capture) keeps track of data changes evolving in a source
system so a downstream process or system can act on those changes.
We recommend two ways for syncing CDC data into Hudi:

-1. Using the Ververica
[flink-cdc-connectors](https://github.com/ververica/flink-cdc-connectors)
directly connect to DB Server to sync the binlog data into Hudi.
- The advantage is that it does not rely on message queues, but the
disadvantage is that it puts pressure on the db server;
-2. Consume data from a message queue (for e.g, the Kafka) using the flink cdc
format, the advantage is that it is highly scalable,
+1. Use [Apache Flink-CDC](https://github.com/apache/flink-cdc) to directly
connect to the database server and sync binlog data into Hudi.
+ The advantage is that it does not rely on message queues, but the
disadvantage is that it puts pressure on the database server.
+2. Consume data from a message queue (e.g., Kafka) using the Flink CDC format.
The advantage is that it is highly scalable,
but the disadvantage is that it relies on message queues.
:::note
-- If the upstream data cannot guarantee the order, you need to specify option
`write.precombine.field` explicitly;
+If the upstream data cannot guarantee ordering, you need to explicitly specify
the `ordering.fields` option.
:::
-### Bulk Insert
+## Bulk Insert
-For the demand of snapshot data import. If the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
+For snapshot data import requirements, if the snapshot data comes from other
data sources, use the `bulk_insert` mode to quickly
import the snapshot data into Hudi.
-
:::note
-`bulk_insert` eliminates the serialization and data merging. The data
deduplication is skipped, so the user need to guarantee the uniqueness of the
data.
+`bulk_insert` eliminates serialization and data merging. Data deduplication is
skipped, so the user needs to guarantee data uniqueness.
:::
:::note
-`bulk_insert` is more efficient in the `batch execution mode`. By default, the
`batch execution mode` sorts the input records
-by the partition path and writes these records to Hudi, which can avoid write
performance degradation caused by
-frequent `file handle` switching.
+`bulk_insert` is more efficient in `batch execution mode`. By default, `batch
execution mode` sorts the input records
+by partition path and writes these records to Hudi, which can avoid
write‑performance degradation caused by
+frequent file‑handle switching.
:::
-:::note
-The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism will affect the number of small files.
-In theory, the parallelism of `bulk_insert` is the number of `bucket`s (In
particular, when each bucket writes to maximum file size, it
-will rollover to the new file handle. Finally, `the number of files` >=
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
+:::note
+The parallelism of `bulk_insert` is specified by `write.tasks`. The
parallelism affects the number of small files.
+In theory, the parallelism of `bulk_insert` equals the number of buckets. (In
particular, when each bucket writes to the maximum file size, it
+rolls over to a new file handle.) The final number of files is greater than or
equal to [`write.bucket_assign.tasks`](configurations#writebucket_assigntasks).
:::
-#### Options
+### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.operation` | `true` | `upsert` | Setting as `bulk_insert` to open
this function |
-| `write.tasks` | `false` | `4` | The parallelism of `bulk_insert`, `the
number of files` >=
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks) |
-| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle
data according to the input field before writing. Enabling this option will
reduce the number of small files, but there may be a risk of data skew |
-| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort data
according to the input field before writing. Enabling this option will reduce
the number of small files when a write task writes multiple partitions |
-| `write.sort.memory` | `false` | `128` | Available managed memory of sort
operator. default `128` MB |
+| Option Name | Required | Default | Remarks
|
+|-----------------------------------|----------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.operation` | `true` | `upsert` | Set to
`bulk_insert` to enable this function
|
+| `write.tasks` | `false` | `4` | The parallelism of
`bulk_insert`; the number of files ≥
[`write.bucket_assign.tasks`](configurations#writebucket_assigntasks)
|
+| `write.bulk_insert.shuffle_input` | `false` | `true` | Whether to shuffle
data by the input field before writing. Enabling this option reduces the number
of small files but may introduce data‑skew risk |
+| `write.bulk_insert.sort_input` | `false` | `true` | Whether to sort
data by the input field before writing. Enabling this option reduces the number
of small files when a write task writes to multiple partitions |
+| `write.sort.memory` | `false` | `128` | Available managed
memory for the sort operator; default is 128 MB
|
-### Index Bootstrap
+## Index Bootstrap
-For the demand of `snapshot data` + `incremental data` import. If the
`snapshot data` already insert into Hudi by [bulk insert](#bulk-insert).
-User can insert `incremental data` in real time and ensure the data is not
repeated by using the index bootstrap function.
+For importing both snapshot data and incremental data: if the snapshot data
has already been inserted into Hudi via [bulk insert](#bulk-insert),
+users can insert incremental data in real time and ensure the data is not
duplicated by using the index bootstrap function.
:::note
-If you think this process is very time-consuming, you can add resources to
write in streaming mode while writing `snapshot data`,
-and then reduce the resources to write `incremental data` (or open the rate
limit function).
+If you find this process very time‑consuming, you can add resources to write
in streaming mode while writing snapshot data,
+then reduce the resources when writing incremental data (or enable the
rate‑limit function).
:::
-#### Options
+### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remain records in Hudi table will be loaded into the Flink state
at one time |
-| `index.partition.regex` | `false` | `*` | Optimize option. Setting
regular expressions to filter partitions. By default, all partitions are loaded
into flink state |
+| Option Name | Required | Default | Remarks
|
+|---------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------|
+| `index.bootstrap.enabled` | `true` | `false` | When index bootstrap is
enabled, the remaining records in the Hudi table are loaded into the Flink
state at once |
+| `index.partition.regex` | `false` | `*` | Optimization option. Set a
regular expression to filter partitions. By default, all partitions are loaded
into Flink state |
-#### How To Use
+### How to use
-1. `CREATE TABLE` creates a statement corresponding to the Hudi table. Note
that the `table.type` must be correct.
-2. Setting `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
-3. Setting Flink checkpoint failure tolerance in `flink-conf.yaml` :
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
-4. Waiting until the first checkpoint succeeds, indicating that the index
bootstrap completed.
-5. After the index bootstrap completed, user can exit and save the savepoint
(or directly use the externalized checkpoint).
-6. Restart the job, setting `index.bootstrap.enable` as `false`.
+1. Use `CREATE TABLE` to create a statement corresponding to the Hudi table.
Note that `table.type` must be correct.
+2. Set `index.bootstrap.enabled` = `true` to enable the index bootstrap
function.
+3. Set the Flink checkpoint failure tolerance in `flink-conf.yaml`:
`execution.checkpointing.tolerable-failed-checkpoints = n` (depending on Flink
checkpoint scheduling times).
+4. Wait until the first checkpoint succeeds, indicating that the index
bootstrap has completed.
+5. After the index bootstrap completes, users can exit and save the savepoint
(or directly use the externalized checkpoint).
+6. Restart the job, setting `index.bootstrap.enable` to `false`.
:::note
-1. Index bootstrap is blocking, so checkpoint cannot be completed during index
bootstrap.
-2. Index bootstrap triggers by the input data. User need to ensure that there
is at least one record in each partition.
-3. Index bootstrap executes concurrently. User can search in log by `finish
loading the index under partition` and `Load record form file` to observe the
progress of index bootstrap.
-4. The first successful checkpoint indicates that the index bootstrap
completed. There is no need to load the index again when recovering from the
checkpoint.
+
+1. Index bootstrap is blocking, so checkpoints cannot complete during index
bootstrap.
+2. Index bootstrap is triggered by the input data. Users need to ensure that
there is at least one record in each partition.
+3. Index bootstrap executes concurrently. Users can search logs for `finish
loading the index under partition` and `Load record from file` to observe the
index‑bootstrap progress.
+4. The first successful checkpoint indicates that the index bootstrap has
completed. There is no need to load the index again when recovering from the
checkpoint.
+
:::
-### Changelog Mode
-Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consumes through stateful computing of flink to have a near-real-time
-data warehouse ETL pipeline (Incremental computing). Hudi MOR table stores
messages in the forms of rows, which supports the retention of all change logs
(Integration at the format level).
-All changelog records can be consumed with Flink streaming reader.
+## Changelog Mode
+
+Hudi can keep all the intermediate changes (I / -U / U / D) of messages, then
consume them through stateful computing in Flink to build a near‑real‑time
+data‑warehouse ETL pipeline (incremental computing). Hudi MOR tables store
messages as rows, which supports the retention of all change logs (integration
at the format level).
+All changelog records can be consumed with the Flink streaming reader.
-#### Options
+### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
+| Option Name | Required | Default | Remarks
|
+|---------------------|----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `changelog.enabled` | `false` | `false` | It is turned off by default, to
have the `upsert` semantics, only the merged messages are ensured to be kept,
intermediate changes may be merged. Setting to true to support consumption of
all changes |
:::note
-Batch (Snapshot) read still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
+Batch (snapshot) reads still merge all the intermediate changes, regardless of
whether the format has stored the intermediate changelog messages.
:::
:::note
-After setting `changelog.enable` as `true`, the retention of changelog records
are only best effort: the asynchronous compaction task will merge the changelog
records into one record, so if the
-stream source does not consume timely, only the merged record for each key can
be read after compaction. The solution is to reserve some buffer time for the
reader by adjusting the compaction strategy, such as
-the compaction options: [`compaction.delta_commits`](#compaction) and
[`compaction.delta_seconds`](#compaction).
+After setting `changelog.enable` to `true`, the retention of changelog records
is best‑effort only: the asynchronous compaction task will merge the changelog
records into one record, so if the
+stream source does not consume in a timely manner, only the merged record for
each key can be read after compaction. The solution is to reserve buffer time
for the reader by adjusting the compaction strategy, such as
+the compaction options `compaction.delta_commits` and
`compaction.delta_seconds`.
:::
+## Append Mode
+
+For `INSERT` mode write operations, new Parquet files are written directly,
and the [auto‑file sizing](file_sizing.md) is not enabled.
+
+### In-Memory Buffer Sort
-### Append Mode
+For append-only workloads, Hudi supports in-memory buffer sorting to improve
Parquet compression ratio. When enabled, data is sorted within the write buffer
before being flushed to disk. This improves columnar file compression
efficiency by grouping similar values together.
-For `INSERT` mode write operation, the current work flow is:
+| Option Name | Required | Default | Remarks
|
+|-----------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------|
+| `write.buffer.sort.enabled` | `false` | `false` | Whether to enable buffer
sort within append write function. Improves Parquet compression ratio by
sorting data before writing |
+| `write.buffer.sort.keys` | `false` | `N/A` | Sort keys concatenated by
comma (e.g., `col1,col2`). Required when `write.buffer.sort.enabled` is `true`
|
+| `write.buffer.size` | `false` | `1000` | Buffer size in number of
records. When buffer reaches this size, data is sorted and flushed to disk
|
-- For Merge_On_Read table, the small file strategies are by default applied:
tries to append to the small avro log files first
-- For Copy_On_Write table, write new parquet files directly, no small file
strategies are applied
+### Disable Meta Fields
+
+For append-only workloads where Hudi metadata fields (e.g.,
`_hoodie_commit_time`, `_hoodie_record_key`) are not needed, you can disable
them to reduce storage overhead. This is useful when integrating with external
systems that don't require Hudi-specific metadata.
+
+| Option Name | Required | Default | Remarks
|
+|-------------------------------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `hoodie.populate.meta.fields` | `false` | `true` | Whether to populate
Hudi meta fields. Set to `false` for append-only workloads to reduce storage
overhead. Note: Some Hudi features may not work when disabled |
Hudi supports rich clustering strategies to optimize the files layout for
`INSERT` mode:
-#### Inline Clustering
+### Inline Clustering
:::note
-Only Copy_On_Write table is supported.
+Only Copy‑on‑Write tables are supported.
:::
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting, for COW table, open the option to enable the small file
merging strategy(no deduplication for keys but the throughput will be affected)
|
+| Option Name | Required | Default | Remarks
|
+|------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.insert.cluster` | `false` | `false` | Whether to merge small files
while ingesting. For COW tables, enable this option to use the small‑file
merging strategy (no deduplication for keys, but throughput will be affected) |
-#### Async Clustering
+### Async Clustering
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `clustering.schedule.enabled` | `false` | `false` | Whether to schedule
clustering plan during write process, by default false |
-| `clustering.delta_commits` | `false` | `4` | Delta commits to schedule the
clustering plan, only valid when `clustering.schedule.enabled` is true |
-| `clustering.async.enabled` | `false` | `false` | Whether to execute
clustering plan asynchronously, by default false |
-| `clustering.tasks` | `false` | `4` | Parallelism of the clustering tasks |
-| `clustering.plan.strategy.target.file.max.bytes` | `false` |
`1024*1024*1024` | The target file size for clustering group, by default 1GB |
-| `clustering.plan.strategy.small.file.limit` | `false` | `600` | The file
that has less size than the threshold (unit MB) are candidates for clustering |
-| `clustering.plan.strategy.sort.columns` | `false` | `N/A` | The columns to
sort by when clustering |
+| Option Name | Required | Default
| Remarks
|
+|--------------------------------------------------|----------|------------------|---------------------------------------------------------------------------------------------------------|
+| `clustering.schedule.enabled` | `false` | `false`
| Whether to schedule the clustering plan during the write process; default
is false |
+| `clustering.delta_commits` | `false` | `4`
| Delta commits for scheduling the clustering plan; only valid when
`clustering.schedule.enabled` is true |
+| `clustering.async.enabled` | `false` | `false`
| Whether to execute the clustering plan asynchronously; default is false
|
+| `clustering.tasks` | `false` | `4`
| Parallelism of the clustering tasks
|
+| `clustering.plan.strategy.target.file.max.bytes` | `false` |
`1024*1024*1024` | The target file size for the clustering group; default is 1
GB |
+| `clustering.plan.strategy.small.file.limit` | `false` | `600`
| Files smaller than the threshold (in MB) are candidates for clustering
|
+| `clustering.plan.strategy.sort.columns` | `false` | `N/A`
| The columns to sort by when clustering
|
-#### Clustering Plan Strategy
+### Clustering Plan Strategy
Custom clustering strategy is supported.
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `clustering.plan.partition.filter.mode` | `false` | `NONE` | Valid options
1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that represent recent
days; 3) `SELECTED_PARTITIONS`: specific partitions |
-| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2` |
Valid for `RECENT_DAYS` mode |
-| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to begin with(inclusive) |
-| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A` | Valid
for `SELECTED_PARTITIONS` mode, specify the partition to end with(inclusive) |
-| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A` | The
regex to filter the partitions |
-| `clustering.plan.strategy.partition.selected` | `false` | `N/A` | Specific
partitions separated by comma `,` |
+| Option Name | Required | Default
| Remarks
|
+|---------------------------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| `clustering.plan.partition.filter.mode` | `false` | `NONE`
| Valid options 1) `NONE`: no limit; 2) `RECENT_DAYS`: choose partitions that
represent recent days; 3) `SELECTED_PARTITIONS`: specific partitions |
+| `clustering.plan.strategy.daybased.lookback.partitions` | `false` | `2`
| Number of partitions to look back; valid for `RECENT_DAYS` mode
|
+| `clustering.plan.strategy.cluster.begin.partition` | `false` | `N/A`
| Valid for `SELECTED_PARTITIONS` mode; specify the partition to begin with
(inclusive) |
+| `clustering.plan.strategy.cluster.end.partition` | `false` | `N/A`
| Valid for `SELECTED_PARTITIONS` mode; specify the partition to end with
(inclusive) |
+| `clustering.plan.strategy.partition.regex.pattern` | `false` | `N/A`
| The regex to filter the partitions
|
+| `clustering.plan.strategy.partition.selected` | `false` | `N/A`
| Specific partitions, separated by commas
|
+
+## Using Bucket Index
+
+Hudi Flink writer supports two types of writer indexes:
+
+- Flink state (default)
+- Bucket index (3 variants: simple, partition-level, consistent hashing)
+
+### Comparison
+
+| Feature | Bucket Index
| Flink State Index
|
+|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------|
+| How It Works | Uses a deterministic hash algorithm to shuffle
records into buckets
| Uses the Flink
state backend to store index data: mappings of record keys to their residing
file group's file IDs |
+| Computing/Storage Cost | No cost for state‑backend indexing
| Has computing and
storage cost for maintaining state; can become a bottleneck when working with
large Hudi tables |
+| Performance | Better performance due to no state overhead
| Performance depends
on state backend efficiency
|
+| File Group Flexibility | **Simple**: Fixed number of buckets (file groups)
per partition, immutable once set<br/>**Partition-Level**: Different fixed
buckets per partition via regex patterns (rescaling requires Spark
procedure)<br/>**Consistent Hashing**: Auto-resizing buckets via clustering |
Dynamically assigns records to file groups based on current table layout; no
pre-configured limits on file group count |
+| Cross‑Partition Changes | Cannot handle changes among partitions (unless
input is a CDC stream)
| No limit on
handling cross‑partition changes
|
+
+:::note
+Bucket index supports only the `UPSERT` write operation and cannot be used
with the [append mode](#append-mode) in Flink.
+:::
+
+### Bucket Index Examples
+
+#### Simple Bucket Index
+
+Fixed number of buckets across all partitions:
+
+```sql
+CREATE TABLE orders_simple_bucket (
+ order_id BIGINT,
+ customer_id BIGINT,
+ amount DOUBLE,
+ order_date STRING,
+ ts BIGINT,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/orders_simple',
+ 'table.type' = 'MERGE_ON_READ',
+
+ -- Bucket Index Configuration
+ 'index.type' = 'BUCKET',
+ 'hoodie.bucket.index.engine' = 'SIMPLE',
+ 'hoodie.bucket.index.hash.field' = 'order_id',
+ 'hoodie.bucket.index.num.buckets' = '16' -- Fixed 16 buckets for ALL
partitions
+);
+
+-- Insert data
+INSERT INTO orders_simple_bucket VALUES
+ (1, 100, 99.99, '2024-01-15', 1000),
+ (2, 101, 49.99, '2024-02-20', 2000);
+```
+
+#### Partition-Level Bucket Index
+
+Different bucket counts for different partitions based on regex patterns:
+
+```sql
+CREATE TABLE orders_partition_bucket (
+ order_id BIGINT,
+ customer_id BIGINT,
+ amount DOUBLE,
+ order_date STRING,
+ ts BIGINT,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/orders_partition',
+ 'table.type' = 'MERGE_ON_READ',
+
+ -- Bucket Index Configuration
+ 'index.type' = 'BUCKET',
+ 'hoodie.bucket.index.engine' = 'SIMPLE',
+ 'hoodie.bucket.index.hash.field' = 'order_id',
+
+ -- Partition-Level Configuration
+ 'hoodie.bucket.index.num.buckets' = '8', -- Default for non-matching
partitions
+ 'hoodie.bucket.index.partition.rule.type' = 'regex',
+ -- Black Friday (11-24), Cyber Monday (11-27), Christmas (12-25) get 128
buckets
+ -- All other dates get 8 buckets (default)
+ 'hoodie.bucket.index.partition.expressions' = '\\d{4}-(11-(24|27)|12-25),128'
+);
+
+-- Insert data - bucket count varies by partition
+INSERT INTO orders_partition_bucket VALUES
+ (1, 100, 999.99, '2024-11-24', 1000), -- Black Friday: 128 buckets
+ (2, 101, 499.99, '2024-11-27', 2000), -- Cyber Monday: 128 buckets
+ (3, 102, 299.99, '2024-12-25', 3000), -- Christmas: 128 buckets
+ (4, 103, 49.99, '2024-01-15', 4000); -- Regular day: 8 buckets
+```
+
+:::note
+For existing simple bucket index tables, use the Spark
`partition_bucket_index_manager` procedure to upgrade to partition-level bucket
index. After upgrade, Flink writers automatically load the expressions from
table metadata.
+:::
+
+#### Consistent Hashing Bucket Index
+
+Auto-expanding buckets via clustering (requires Spark for execution):
+
+```sql
+CREATE TABLE orders_consistent_hashing (
+ order_id BIGINT,
+ customer_id BIGINT,
+ amount DOUBLE,
+ order_date STRING,
+ ts BIGINT,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) PARTITIONED BY (order_date)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/orders_consistent',
+ 'table.type' = 'MERGE_ON_READ',
+
+ -- Consistent Hashing Bucket Index
+ 'index.type' = 'BUCKET',
+ 'hoodie.bucket.index.engine' = 'CONSISTENT_HASHING',
+ 'hoodie.bucket.index.hash.field' = 'order_id',
+
+ -- Initial and boundary configuration
+ 'hoodie.bucket.index.num.buckets' = '4', -- Initial bucket count
+ 'hoodie.bucket.index.min.num.buckets' = '2', -- Minimum allowed
+ 'hoodie.bucket.index.max.num.buckets' = '128', -- Maximum allowed
+
+ -- Clustering configuration (required for auto-resizing)
+ 'clustering.schedule.enabled' = 'true',
+ 'clustering.delta_commits' = '5', -- Schedule clustering every 5 commits
+ 'clustering.plan.strategy.class' =
'org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy',
+
+ -- File size thresholds for bucket resizing
+ 'hoodie.clustering.plan.strategy.target.file.max.bytes' = '1073741824', --
1GB max
+ 'hoodie.clustering.plan.strategy.small.file.limit' = '314572800' -- 300MB
min
+);
+
+-- Insert data - buckets auto-adjust based on file sizes
+INSERT INTO orders_consistent_hashing
+SELECT * FROM source_stream;
+```
+
+:::note
+**Consistent hashing bucket index** automatically adjusts bucket counts via
clustering. Flink can schedule clustering plans, but execution currently
requires Spark. Start with `hoodie.bucket.index.num.buckets` and the system
dynamically resizes based on file sizes within the min/max bounds.
+:::
+
+### Configuration Reference
+
+| Option | Applies To | Default
| Description
|
+|---------------------------------------------|--------------------|---------------|---------------------------------------------------------------------------|
+| `index.type` | All |
`FLINK_STATE` | Set to `BUCKET` to enable bucket index
|
+| `hoodie.bucket.index.engine` | All | `SIMPLE`
| Engine type: `SIMPLE` or `CONSISTENT_HASHING`
|
+| `hoodie.bucket.index.hash.field` | All | Record
key | Fields to hash for bucketing; can be a subset of record key
|
+| `hoodie.bucket.index.num.buckets` | All | `4`
| Default bucket count per partition (initial count for consistent hashing)
|
+| `hoodie.bucket.index.partition.expressions` | Partition-Level | N/A
| Regex patterns and bucket counts: `pattern1,count1;pattern2,count2`
|
+| `hoodie.bucket.index.partition.rule.type` | Partition-Level | `regex`
| Rule parser type
|
+| `hoodie.bucket.index.min.num.buckets` | Consistent Hashing | N/A
| Minimum bucket count (prevents over-merging)
|
+| `hoodie.bucket.index.max.num.buckets` | Consistent Hashing | N/A
| Maximum bucket count (prevents unlimited expansion)
|
+| `clustering.schedule.enabled` | Consistent Hashing | `false`
| Must be `true` for auto-resizing
|
+| `clustering.plan.strategy.class` | Consistent Hashing | N/A
| Set to `FlinkConsistentBucketClusteringPlanStrategy`
|
+
+## Rate Limiting
+
+Hudi provides rate limiting capabilities for both writes and streaming reads
to control data flow and prevent performance degradation.
+
+### Write Rate Limiting
+
+In many scenarios, users publish both historical snapshot data and real‑time
incremental updates to the same message queue, then consume from the earliest
offset using Flink to ingest everything into Hudi. This backfill pattern can
cause performance issues:
-### Bucket Index
+- **High burst throughput**: The entire historical dataset arrives at once,
overwhelming the writer with a massive volume of records
+- **Scattered writes across table partitions**: Historical records arrive
scattered across many different table partitions (e.g., records from many
different dates if the table is partitioned by date). This forces the writer to
constantly switch between partitions, keeping many file handles open
simultaneously and causing memory pressure, which degrades write performance
and causes throughput instability
-By default, flink uses the state-backend to keep the file index: the mapping
from primary key to fileId. When the input data set is large,
-there is possibility the cost of the state be a bottleneck, the bucket index
use deterministic hash algorithm for shuffling the records into
-buckets, thus can avoid the storage and query overhead of indexes.
+The `write.rate.limit` option helps smooth out the ingestion flow, preventing
traffic jitter and improving stability during backfill operations.
-#### Options
+### Streaming Read Rate Limiting
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `index.type` | `false` | `FLINK_STATE` | Set up as `BUCKET` to use bucket
index |
-| `hoodie.bucket.index.hash.field` | `false` | Primary key | Can be a subset
of the primary key |
-| `hoodie.bucket.index.num.buckets` | `false` | `4` | The number of buckets
per-partition, it is immutable once set up |
+For Flink streaming reads, rate limiting helps avoid backpressure when
processing large workloads. The `read.splits.limit` option controls the maximum
number of input splits allowed to be read in each check interval. This feature
is particularly useful when:
-Comparing to state index:
+- Reading from tables with a large backlog of commits
+- Preventing downstream operators from being overwhelmed
+- Controlling resource consumption during catch-up scenarios
-- Bucket index has no computing and storage cost of state-backend index, thus
has better performance
-- Bucket index can not expand the buckets dynamically, the state-backend index
can expand the buckets dynamically based on current file layout
-- Bucket index can not handle changes among partitions(no limit if the input
itself is CDC stream), state-backend index has no limit
+The average read rate can be calculated as: **`read.splits.limit` /
`read.streaming.check-interval`** splits per second.
-### Rate Limit
-There are many use cases that user put the full history data set onto the
message queue together with the realtime incremental data. Then they consume
the data from the queue into the hudi from the earliest offset using flink.
Consuming history data set has these characteristics:
-1). The instant throughput is huge 2). It has serious disorder (with random
writing partitions). It will lead to degradation of writing performance and
throughput glitches. For this case, the speed limit parameter can be turned on
to ensure smooth writing of the flow.
+### Options
-#### Options
-| Option Name | Required | Default | Remarks |
-| ----------- | ------- | ------- | ------- |
-| `write.rate.limit` | `false` | `0` | Default disable the rate limit |
+| Option Name | Required | Default | Remarks
|
+|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit` | `false` | `0` | Write
record rate limit per second to prevent traffic jitter and improve stability.
Default is 0 (no limit)
|
+| `read.splits.limit` | `false` | `Integer.MAX_VALUE` | Maximum
number of splits allowed to read in each instant check for streaming reads.
Average read rate = `read.splits.limit`/`read.streaming.check-interval`.
Default is no limit |
+| `read.streaming.check-interval` | `false` | `60` | Check
interval in seconds for streaming reads. Default is 60 seconds (1 minute)
|
diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md
index 16db53dcd0ef..e651c1a47a9d 100644
--- a/website/docs/sql_dml.md
+++ b/website/docs/sql_dml.md
@@ -487,7 +487,7 @@ This is done to ensure that the compaction and cleaning
services are not execute
### Consistent hashing index (Experimental)
-We have introduced the Consistent Hashing Index since [0.13.0
release](/releases/release-0.13.0#consistent-hashing-index). In comparison to
the static hashing index ([Bucket
Index](/releases/release-0.11.0#bucket-index)), the consistent hashing index
offers dynamic scalability of data buckets for the writer.
+We have introduced the Consistent Hashing Bucket Index since [0.13.0
release](/releases/release-0.13.0#consistent-hashing-index). This is one of
three [bucket index](indexes#additional-writer-side-indexes) variants available
in Hudi. The consistent hashing bucket index offers dynamic scalability of data
buckets for the writer.
You can find the
[RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md) for the
design of this feature.
In the 0.13.X release, the Consistent Hashing Index is supported only for
Spark engine. And since [release
0.14.0](/releases/release-0.14.0#consistent-hashing-index-support), the index
is supported for Flink engine.