cshuo commented on code in PR #18867:
URL: https://github.com/apache/hudi/pull/18867#discussion_r3315494812


##########
website/docs/ingestion_flink.md:
##########
@@ -349,10 +358,216 @@ For Flink streaming reads, rate limiting helps avoid 
backpressure when processin
 
 The average read rate can be calculated as: **`read.splits.limit` / 
`read.streaming.check-interval`** splits per second.
 
+Hudi 1.2.0 adds `read.commits.limit`, which complements `read.splits.limit` by 
capping the number of commits (instants) consumed per check interval. This is 
useful when tables have many small commits — limiting commits bounds the number 
of splits regardless of their individual size.
+
+### Options
+
+| Option Name                     | Required | Default             | Remarks   
                                                                                
                                                                                
                 |
+|---------------------------------|----------|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit`              | `false`  | `0`                 | Write 
record rate limit per second to prevent traffic jitter and improve stability. 
Default is 0 (no limit)                                                         
                       |
+| `read.splits.limit`             | `false`  | `Integer.MAX_VALUE` | Maximum 
number of splits allowed to read in each instant check for streaming reads. 
Average read rate = `read.splits.limit`/`read.streaming.check-interval`. 
Default is no limit           |
+| `read.commits.limit`            | `false`  | `(none)`            | Maximum 
number of commits (instants) allowed to read in each check interval. 
Complements `read.splits.limit`. Average rate = 
`read.commits.limit`/`read.streaming.check-interval`. Default is no limit |
+| `read.streaming.check-interval` | `false`  | `60`                | Check 
interval in seconds for streaming reads. Default is 60 seconds (1 minute)       
                                                                                
                     |
+
+## Flink Source V2
+
+Hudi 1.2.0 introduces a new Flink source implementation 
([RFC-95](https://github.com/apache/hudi/blob/master/rfc/rfc-95/rfc-95.md)) 
based on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface),
 available as an opt-in feature via the `read.source-v2.enabled` flag.
+
+### Why Source V2?
+
+The legacy Hudi Flink source was built on Flink's `SourceFunction` API. The 
FLIP-27 rewrite brings:
+
+- **Resumable split assignment** — splits can be checkpointed independently, 
enabling finer-grained recovery
+- **Checkpoint alignment** — the new API participates in Flink's coordinated 
checkpoint protocol, improving end-to-end consistency
+- **Push-down support** — predicate push-down, partition pruning, and `LIMIT` 
push-down are supported through the new source interface, reducing data scanned 
at the source level
+
+### Enabling Source V2
+
+```sql
+CREATE TABLE t1 (
+  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+  name VARCHAR(10),
+  age INT,
+  ts TIMESTAMP(3),
+  `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+  'connector' = 'hudi',
+  'path' = '${path}',
+  'table.type' = 'MERGE_ON_READ',
+  'read.source-v2.enabled' = 'true'  -- enable the FLIP-27 source
+);
+```
+
+### Options
+
+| Option Name               | Required | Default | Remarks                     
                                                                            |
+|---------------------------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| `read.source-v2.enabled`  | `false`  | `false` | Whether to use the FLIP-27 
new source (Source V2) to consume data files. Default is the legacy source  |
+
+### Savepoint Incompatibility
+
+:::warning
+Savepoints taken with the **legacy source** (`read.source-v2.enabled=false`) 
are **not compatible** with the Source V2 source, and vice versa. When 
switching from the legacy source to Source V2, start a fresh job without 
restoring from a legacy savepoint. If you need to preserve read progress, 
record the last committed instant time and use `read.start-commit` to resume 
from that point.
+:::
+
+## Record-Level Index (RLI) Bucket Indexing for Flink
+
+As of Hudi 1.2.0, the Flink writer supports the Record-Level Index (RLI) 
backed by the metadata table, in addition to the existing `FLINK_STATE` and 
`BUCKET` index types. RLI is stored in the metadata table and avoids the 
state-backend overhead of `FLINK_STATE`, while supporting full global or 
partition-scoped uniqueness guarantees.
+
+Two RLI variants are available via `index.type`:
+
+- `RECORD_LEVEL_INDEX` — partitioned RLI; enforces uniqueness per (partition 
path, record key) pair
+- `GLOBAL_RECORD_LEVEL_INDEX` — global RLI; enforces uniqueness across all 
partitions
+
+### Bootstrap
+
+When enabling RLI on an existing table, the bootstrap process loads existing 
record locations into RocksDB before the first write. Bootstrap is triggered by 
setting `index.bootstrap.enabled=true`.
+
+```sql
+CREATE TABLE my_hudi_table (
+  id BIGINT,
+  name STRING,
+  ts BIGINT,
+  dt STRING,
+  PRIMARY KEY (id) NOT ENFORCED
+)
+PARTITIONED BY (dt)
+WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/my_hudi_table',
+  'table.type' = 'MERGE_ON_READ',
+  'index.type' = 'RECORD_LEVEL_INDEX',
+  'metadata.enabled' = 'true',
+  'index.bootstrap.enabled' = 'true',  -- enable bootstrap on first run
+  'index.bootstrap.rocksdb.path' = '/tmp/hudi-rli-rocksdb'
+);
+```
+
+After the first successful checkpoint (which completes bootstrap), restart the 
job with `index.bootstrap.enabled=false`.
+
+### In-Pipeline MDT Compaction
+
+For RLI workloads, the metadata table (MDT) accumulates log files that need 
periodic compaction. The option `metadata.compaction.async.enabled` (default 
`true`) runs MDT compaction inside the Flink pipeline after every 
`metadata.compaction.delta_commits` (default `10`) delta commits.
+
+### Options
+
+| Option Name                         | Required | Default  | Remarks          
                                                                                
                                                       |
+|-------------------------------------|----------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `index.type`                        | `false`  | `FLINK_STATE` | Set to 
`RECORD_LEVEL_INDEX` or `GLOBAL_RECORD_LEVEL_INDEX` to use the 
metadata-table-backed RLI                                                    |
+| `index.bootstrap.enabled`           | `false`  | `false`  | Bootstrap the 
index from the existing table on first run. Blocks checkpoints during bootstrap 
                                                          |
+| `index.bootstrap.rocksdb.path`      | `false`  | system temp dir | Local 
path for RocksDB storage during RLI bootstrap. Each task manager creates a 
unique subdirectory under this path                             |
+| `index.rli.cache.size`              | `false`  | `256`    | Maximum memory 
in MB for the RLI cache per bucket-assign task. Dynamically adjusted based on 
historical usage                                           |
+| `index.rli.lookup.minibatch.size`   | `false`  | `1000`   | Maximum records 
buffered per mini-batch during RLI lookup. Mini-batching reduces individual 
index lookups. Minimum effective value is 1000              |
+| `metadata.compaction.async.enabled` | `false`  | `true`   | Whether to run 
MDT compaction asynchronously within the Flink pipeline. Recommended to keep 
enabled for RLI workloads                                  |
+| `metadata.compaction.delta_commits` | `false`  | `10`     | Number of MDT 
delta commits that trigger in-pipeline compaction                               
                                                          |
+
+:::note
+`GLOBAL_RECORD_LEVEL_INDEX` requires `metadata.enabled=true` and 
`index.global.enabled=true`. The Flink table factory validates these 
constraints automatically.
+:::
+
+## Lookup Join
+
+Hudi 1.2.0 adds a RocksDB-backed cache option for Flink lookup joins against 
Hudi dimension tables. This avoids JVM heap pressure when the dimension table 
is large.
+
 ### Options
 
-| Option Name                     | Required | Default             | Remarks   
                                                                                
                                                                                
       |
-|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `write.rate.limit`              | `false`  | `0`                 | Write 
record rate limit per second to prevent traffic jitter and improve stability. 
Default is 0 (no limit)                                                         
             |
-| `read.splits.limit`             | `false`  | `Integer.MAX_VALUE` | Maximum 
number of splits allowed to read in each instant check for streaming reads. 
Average read rate = `read.splits.limit`/`read.streaming.check-interval`. 
Default is no limit |
-| `read.streaming.check-interval` | `false`  | `60`                | Check 
interval in seconds for streaming reads. Default is 60 seconds (1 minute)       
                                                                                
           |
+| Option Name                    | Required | Default                         
| Remarks                                                                       
                                                                  |
+|--------------------------------|----------|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| `lookup.join.cache.type`       | `false`  | `heap`                          
| Storage backend for the lookup join cache. `heap` (default) stores rows in 
JVM heap; `rocksdb` stores rows off-heap in an embedded RocksDB instance |
+| `lookup.join.rocksdb.path`     | `false`  | 
`${java.io.tmpdir}/hudi-lookup-rocksdb` | Local directory for RocksDB data when 
`lookup.join.cache.type=rocksdb`. Cleaned up when the lookup function closes    
                |
+| `lookup.async`                 | `false`  | `false`                         
| Whether to enable async lookup join. Async join can improve throughput when 
the lookup function has high latency                                  |
+| `lookup.async-thread-number`   | `false`  | `16`                            
| Number of threads for async lookup join                                       
                                                                  |
+
+### Example
+
+```sql
+-- Streaming fact table
+CREATE TABLE orders (
+  order_id BIGINT,
+  customer_id BIGINT,
+  amount DOUBLE,
+  ts TIMESTAMP(3),
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
+  PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/orders',
+  'table.type' = 'MERGE_ON_READ',
+  'read.streaming.enabled' = 'true'
+);
+
+-- Hudi dimension table with RocksDB-backed lookup cache
+CREATE TABLE customers (
+  customer_id BIGINT,
+  name STRING,
+  city STRING,
+  PRIMARY KEY (customer_id) NOT ENFORCED
+) WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/customers',
+  'lookup.join.cache.type' = 'rocksdb',
+  'lookup.join.rocksdb.path' = '/tmp/hudi-lookup-rocksdb'
+);
+
+-- Lookup join
+SELECT o.order_id, c.name, o.amount
+FROM orders AS o
+JOIN customers FOR SYSTEM_TIME AS OF o.ts AS c

Review Comment:
   the lookup join example should use a processing-time attribute. Hudi’s 
lookup join tests use `PROCTIME() AS proc_time` and `FOR SYSTEM_TIME AS OF 
o.proc_time`; the current example uses event time `o.ts`, which may not 
exercise the lookup join path users are trying to configure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to