cshuo commented on code in PR #18867:
URL: https://github.com/apache/hudi/pull/18867#discussion_r3315494812
##########
website/docs/ingestion_flink.md:
##########
@@ -349,10 +358,216 @@ For Flink streaming reads, rate limiting helps avoid
backpressure when processin
The average read rate can be calculated as: **`read.splits.limit` /
`read.streaming.check-interval`** splits per second.
+Hudi 1.2.0 adds `read.commits.limit`, which complements `read.splits.limit` by
capping the number of commits (instants) consumed per check interval. This is
useful when tables have many small commits — limiting commits bounds the number
of splits regardless of their individual size.
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|---------------------------------|----------|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit` | `false` | `0` | Write
record rate limit per second to prevent traffic jitter and improve stability.
Default is 0 (no limit)
|
+| `read.splits.limit` | `false` | `Integer.MAX_VALUE` | Maximum
number of splits allowed to read in each instant check for streaming reads.
Average read rate = `read.splits.limit`/`read.streaming.check-interval`.
Default is no limit |
+| `read.commits.limit` | `false` | `(none)` | Maximum
number of commits (instants) allowed to read in each check interval.
Complements `read.splits.limit`. Average rate =
`read.commits.limit`/`read.streaming.check-interval`. Default is no limit |
+| `read.streaming.check-interval` | `false` | `60` | Check
interval in seconds for streaming reads. Default is 60 seconds (1 minute)
|
+
+## Flink Source V2
+
+Hudi 1.2.0 introduces a new Flink source implementation
([RFC-95](https://github.com/apache/hudi/blob/master/rfc/rfc-95/rfc-95.md))
based on
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface),
available as an opt-in feature via the `read.source-v2.enabled` flag.
+
+### Why Source V2?
+
+The legacy Hudi Flink source was built on Flink's `SourceFunction` API. The
FLIP-27 rewrite brings:
+
+- **Resumable split assignment** — splits can be checkpointed independently,
enabling finer-grained recovery
+- **Checkpoint alignment** — the new API participates in Flink's coordinated
checkpoint protocol, improving end-to-end consistency
+- **Push-down support** — predicate push-down, partition pruning, and `LIMIT`
push-down are supported through the new source interface, reducing data scanned
at the source level
+
+### Enabling Source V2
+
+```sql
+CREATE TABLE t1 (
+ uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = '${path}',
+ 'table.type' = 'MERGE_ON_READ',
+ 'read.source-v2.enabled' = 'true' -- enable the FLIP-27 source
+);
+```
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|---------------------------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| `read.source-v2.enabled` | `false` | `false` | Whether to use the FLIP-27
new source (Source V2) to consume data files. Default is the legacy source |
+
+### Savepoint Incompatibility
+
+:::warning
+Savepoints taken with the **legacy source** (`read.source-v2.enabled=false`)
are **not compatible** with the Source V2 source, and vice versa. When
switching from the legacy source to Source V2, start a fresh job without
restoring from a legacy savepoint. If you need to preserve read progress,
record the last committed instant time and use `read.start-commit` to resume
from that point.
+:::
+
+## Record-Level Index (RLI) Bucket Indexing for Flink
+
+As of Hudi 1.2.0, the Flink writer supports the Record-Level Index (RLI)
backed by the metadata table, in addition to the existing `FLINK_STATE` and
`BUCKET` index types. RLI is stored in the metadata table and avoids the
state-backend overhead of `FLINK_STATE`, while supporting full global or
partition-scoped uniqueness guarantees.
+
+Two RLI variants are available via `index.type`:
+
+- `RECORD_LEVEL_INDEX` — partitioned RLI; enforces uniqueness per (partition
path, record key) pair
+- `GLOBAL_RECORD_LEVEL_INDEX` — global RLI; enforces uniqueness across all
partitions
+
+### Bootstrap
+
+When enabling RLI on an existing table, the bootstrap process loads existing
record locations into RocksDB before the first write. Bootstrap is triggered by
setting `index.bootstrap.enabled=true`.
+
+```sql
+CREATE TABLE my_hudi_table (
+ id BIGINT,
+ name STRING,
+ ts BIGINT,
+ dt STRING,
+ PRIMARY KEY (id) NOT ENFORCED
+)
+PARTITIONED BY (dt)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/my_hudi_table',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'RECORD_LEVEL_INDEX',
+ 'metadata.enabled' = 'true',
+ 'index.bootstrap.enabled' = 'true', -- enable bootstrap on first run
+ 'index.bootstrap.rocksdb.path' = '/tmp/hudi-rli-rocksdb'
+);
+```
+
+After the first successful checkpoint (which completes bootstrap), restart the
job with `index.bootstrap.enabled=false`.
+
+### In-Pipeline MDT Compaction
+
+For RLI workloads, the metadata table (MDT) accumulates log files that need
periodic compaction. The option `metadata.compaction.async.enabled` (default
`true`) runs MDT compaction inside the Flink pipeline after every
`metadata.compaction.delta_commits` (default `10`) delta commits.
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|-------------------------------------|----------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `index.type` | `false` | `FLINK_STATE` | Set to
`RECORD_LEVEL_INDEX` or `GLOBAL_RECORD_LEVEL_INDEX` to use the
metadata-table-backed RLI |
+| `index.bootstrap.enabled` | `false` | `false` | Bootstrap the
index from the existing table on first run. Blocks checkpoints during bootstrap
|
+| `index.bootstrap.rocksdb.path` | `false` | system temp dir | Local
path for RocksDB storage during RLI bootstrap. Each task manager creates a
unique subdirectory under this path |
+| `index.rli.cache.size` | `false` | `256` | Maximum memory
in MB for the RLI cache per bucket-assign task. Dynamically adjusted based on
historical usage |
+| `index.rli.lookup.minibatch.size` | `false` | `1000` | Maximum records
buffered per mini-batch during RLI lookup. Mini-batching reduces individual
index lookups. Minimum effective value is 1000 |
+| `metadata.compaction.async.enabled` | `false` | `true` | Whether to run
MDT compaction asynchronously within the Flink pipeline. Recommended to keep
enabled for RLI workloads |
+| `metadata.compaction.delta_commits` | `false` | `10` | Number of MDT
delta commits that trigger in-pipeline compaction
|
+
+:::note
+`GLOBAL_RECORD_LEVEL_INDEX` requires `metadata.enabled=true` and
`index.global.enabled=true`. The Flink table factory validates these
constraints automatically.
+:::
+
+## Lookup Join
+
+Hudi 1.2.0 adds a RocksDB-backed cache option for Flink lookup joins against
Hudi dimension tables. This avoids JVM heap pressure when the dimension table
is large.
+
### Options
-| Option Name | Required | Default | Remarks
|
-|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `write.rate.limit` | `false` | `0` | Write
record rate limit per second to prevent traffic jitter and improve stability.
Default is 0 (no limit)
|
-| `read.splits.limit` | `false` | `Integer.MAX_VALUE` | Maximum
number of splits allowed to read in each instant check for streaming reads.
Average read rate = `read.splits.limit`/`read.streaming.check-interval`.
Default is no limit |
-| `read.streaming.check-interval` | `false` | `60` | Check
interval in seconds for streaming reads. Default is 60 seconds (1 minute)
|
+| Option Name | Required | Default
| Remarks
|
+|--------------------------------|----------|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| `lookup.join.cache.type` | `false` | `heap`
| Storage backend for the lookup join cache. `heap` (default) stores rows in
JVM heap; `rocksdb` stores rows off-heap in an embedded RocksDB instance |
+| `lookup.join.rocksdb.path` | `false` |
`${java.io.tmpdir}/hudi-lookup-rocksdb` | Local directory for RocksDB data when
`lookup.join.cache.type=rocksdb`. Cleaned up when the lookup function closes
|
+| `lookup.async` | `false` | `false`
| Whether to enable async lookup join. Async join can improve throughput when
the lookup function has high latency |
+| `lookup.async-thread-number` | `false` | `16`
| Number of threads for async lookup join
|
+
+### Example
+
+```sql
+-- Streaming fact table
+CREATE TABLE orders (
+ order_id BIGINT,
+ customer_id BIGINT,
+ amount DOUBLE,
+ ts TIMESTAMP(3),
+ WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/orders',
+ 'table.type' = 'MERGE_ON_READ',
+ 'read.streaming.enabled' = 'true'
+);
+
+-- Hudi dimension table with RocksDB-backed lookup cache
+CREATE TABLE customers (
+ customer_id BIGINT,
+ name STRING,
+ city STRING,
+ PRIMARY KEY (customer_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/customers',
+ 'lookup.join.cache.type' = 'rocksdb',
+ 'lookup.join.rocksdb.path' = '/tmp/hudi-lookup-rocksdb'
+);
+
+-- Lookup join
+SELECT o.order_id, c.name, o.amount
+FROM orders AS o
+JOIN customers FOR SYSTEM_TIME AS OF o.ts AS c
Review Comment:
the lookup join example should use a processing-time attribute. Hudi’s
lookup join tests use `PROCTIME() AS proc_time` and `FOR SYSTEM_TIME AS OF
o.proc_time`; the current example uses event time `o.ts`, which may not
exercise the lookup join path users are trying to configure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]