hudi-agent commented on code in PR #19046:
URL: https://github.com/apache/hudi/pull/19046#discussion_r3457456092


##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:

Review Comment:
   πŸ€– The lookup order here is RocksDB β†’ MDT, but both hold only committed 
state, whereas RFC-106's in-memory cache also tracks uncommitted records from 
the in-flight checkpoint. Does the RocksDB cache replace that uncommitted 
buffer or sit alongside it? If a record updates a key that was inserted earlier 
in the same not-yet-committed checkpoint, how do we avoid misclassifying it as 
an insert and creating a duplicate?
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.
+3. Once loaded, perform the record lookup against the newly populated column 
family and continue normal processing.
+4. The on-demand loaded partition is subject to the same TTL-based eviction as 
bootstrapped partitions (see [TTL-Based Partition 
Eviction](#ttl-based-partition-eviction)).
+
+This two-tier approach β€” time-bounded bootstrap plus on-demand loading β€” 
ensures that:
+
+- **Bootstrap is fast and predictable**: loading X days of index data is 
bounded and proportional to recent write volume, not total table size. For a 
daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day 
bootstrap loads ~4.2 GB β€” completing in seconds rather than minutes.
+- **Late-arriving data is handled correctly**: updates to partitions older 
than X days (e.g., backfills, corrections, late-arriving events) trigger 
on-demand loading of only the affected partition, avoiding a full re-bootstrap.
+- **Cache growth remains bounded**: combined with TTL eviction, the cache 
holds at most `bootstrap_days + on-demand loaded` partitions, with cold 
partitions automatically evicted.
+
+```
+Bootstrap Timeline (X = 7 days)
+
+◄──── Older partitions ─────◄──── Bootstrap window (7 days) ────►│ Today
+                            β”‚                                     β”‚
+ dt=2025-01-10  dt=2025-01-13  dt=2025-01-15 ... dt=2025-01-21  dt=2025-01-22
+       β”‚              β”‚              β”‚                    β”‚
+   Not loaded     Not loaded     Bootstrapped at       Bootstrapped
+   (load on       (load on       job start             at job start
+    demand if      demand if
+    needed)        needed)
+
+When a record arrives for dt=2025-01-10:
+  1. Column family "dt=2025-01-10" not found in RocksDB
+  2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT
+  3. Create column family, bulk-load entries
+  4. Perform lookup and continue
+```
+
+### Incremental Cache Maintenance
+
+To achieve the on demand RLI load for an older partition, RLIBootstrapOperator 
needs to be revised to access the shared RocksDB instance with BucketAssign 
operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is 
maintained incrementally during normal write operations:

Review Comment:
   πŸ€– @danny0405 sharing a single RocksDB instance between RLIBootstrapOperator 
and BucketAssign implies the two operators are co-located in the same JVM/task 
slot. If there's a keyBy/shuffle between them they'd run in separate tasks 
(possibly separate TMs) and couldn't share the instance. Could you clarify how 
co-location is guaranteed (operator chaining?), and how concurrent access from 
the two operators to the shared instance is made thread-safe?
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.

Review Comment:
   πŸ€– Since Flink already runs RocksDB as its state backend, have you considered 
using Flink managed keyed state (e.g. MapState with state TTL) instead of a 
separately-managed RocksDB instance? That would provide incremental 
checkpointing and recovery for free β€” surviving restarts without the full 
re-bootstrap from MDT β€” and would sidestep manual lifecycle/discard handling. 
It'd be worth an Alternatives Considered section laying out why a standalone 
instance was chosen over managed state.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.
+3. Once loaded, perform the record lookup against the newly populated column 
family and continue normal processing.
+4. The on-demand loaded partition is subject to the same TTL-based eviction as 
bootstrapped partitions (see [TTL-Based Partition 
Eviction](#ttl-based-partition-eviction)).
+
+This two-tier approach β€” time-bounded bootstrap plus on-demand loading β€” 
ensures that:
+
+- **Bootstrap is fast and predictable**: loading X days of index data is 
bounded and proportional to recent write volume, not total table size. For a 
daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day 
bootstrap loads ~4.2 GB β€” completing in seconds rather than minutes.
+- **Late-arriving data is handled correctly**: updates to partitions older 
than X days (e.g., backfills, corrections, late-arriving events) trigger 
on-demand loading of only the affected partition, avoiding a full re-bootstrap.
+- **Cache growth remains bounded**: combined with TTL eviction, the cache 
holds at most `bootstrap_days + on-demand loaded` partitions, with cold 
partitions automatically evicted.
+
+```
+Bootstrap Timeline (X = 7 days)
+
+◄──── Older partitions ─────◄──── Bootstrap window (7 days) ────►│ Today
+                            β”‚                                     β”‚
+ dt=2025-01-10  dt=2025-01-13  dt=2025-01-15 ... dt=2025-01-21  dt=2025-01-22
+       β”‚              β”‚              β”‚                    β”‚
+   Not loaded     Not loaded     Bootstrapped at       Bootstrapped
+   (load on       (load on       job start             at job start
+    demand if      demand if
+    needed)        needed)
+
+When a record arrives for dt=2025-01-10:
+  1. Column family "dt=2025-01-10" not found in RocksDB
+  2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT
+  3. Create column family, bulk-load entries
+  4. Perform lookup and continue
+```
+
+### Incremental Cache Maintenance
+
+To achieve the on demand RLI load for an older partition, RLIBootstrapOperator 
needs to be revised to access the shared RocksDB instance with BucketAssign 
operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is 
maintained incrementally during normal write operations:
+
+#### On Record Processing
+
+```
+for each incoming record r:
+    1. In RLIBootstrapOperator, look up through shared RocksDB for 
r.partitionPath
+       β†’ If column family does not exist:
+        a. Load Partitioned Record Index for r.partitionPath from MDT 
(on-demand)
+        b. Create column family and bulk-load entries
+       β†’ If found: forward to next operator BucketAssign
+    2. In BucketAssign operator, check RocksDB cache for r.key in column 
family r.partitionPath
+       β†’ If found in RocksDB: use cached location (committed record)
+       β†’ If not found: this is an INSERT, assign new file group
+    3. Update RocksDB cache with r.key β†’ assigned location
+```
+
+In this process above, the PreBucketAssign is responsible for check whether a 
record is from older partitions that is not boostrap. If yes, it will on-demand 
load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator.
+
+#### On Index Write
+
+In the `IndexWrite` operator (from RFC-106), index records are written to MDT. 
The RocksDB cache in the `BucketAssigner` is updated in-line as records flow 
through the pipeline, ensuring the cache stays ahead of MDT commits.
+
+### TTL-Based Partition Eviction
+
+For partitioned RLI, the cache implements automatic eviction of cold 
partitions:
+
+- Each column family tracks the **last access timestamp** (last time a record 
was written to or looked up in that partition).
+- A background thread periodically scans column family metadata and drops 
column families whose last access exceeds the configured TTL.
+- The TTL should be set based on the workload's partition access pattern. For 
daily-partitioned event data, a TTL of 3–7 days is typical.
+
+Configuration:
+
+| Property | Default | Description |
+|---|---|---|
+| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based 
partitioned cache |
+| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local 
directory for RocksDB data |
+| `hoodie.index.cache.rocksdb.bootstrap.days` | `7` | Number of days of 
Partitioned Record Index to load during bootstrap. Only partitions within this 
window are pre-loaded; older partitions are loaded on demand when updates are 
observed. |
+| `hoodie.index.cache.rocksdb.partition.ttl.hours` | `168` (7 days) | TTL for 
partition column families |
+| `hoodie.index.cache.rocksdb.block.cache.mb` | `256` | RocksDB block cache 
size (off-heap) |
+| `hoodie.index.cache.rocksdb.compaction.style` | `LEVEL` | RocksDB compaction 
style |
+
+### Storage Overhead
+
+RocksDB occupies approximately **2x the storage** compared to native HFile 
format in MDT, due to:
+
+1. **Compression codec difference**: RocksDB uses Snappy compression by 
default, while Hudi MDT uses gzip, which achieves higher compression ratios.
+2. **Uncompacted SST files**: During active writes, RocksDB maintains multiple 
levels of SST files before compaction merges them.
+3. **WAL disabled**: Write-ahead log is disabled since the cache can be 
rebuilt from MDT on failure, reducing write amplification.
+
+For a partition with 10 million records at ~60 bytes per entry, the RocksDB 
footprint is approximately 1.2 GB on disk.
+
+### Consistency and Failure Handling
+
+The RocksDB cache is a **derived, disposable replica** β€” MDT remains the 
single source of truth. This simplifies consistency handling:
+
+#### Task Failover
+
+1. The RocksDB cache on the failed task's container is discarded (ephemeral 
local storage).
+2. The recovered task bootstraps a fresh RocksDB instance from MDT.
+4. The coordinator recommits any pending Hudi instants (as described in 
RFC-106).
+
+#### Job Restart
+
+1. All RocksDB state is discarded.
+2. Full bootstrap from MDT is triggered.
+3. The coordinator handles recommitting of pending instants per RFC-106's 
recovery protocol.
+
+#### Concurrent Writers (OCC)
+
+When multiple Flink jobs write to the same Hudi table under Optimistic 
Concurrency Control:
+
+1. Each writer maintains its own independent RocksDB cache.
+2. Caches may become stale relative to other writers' commits.
+3. Conflict detection occurs at commit time via Hudi's OCC protocol.
+4. On conflict detection, the losing writer's RocksDB cache must be refreshed 
by re-reading the conflicting partitions from MDT.
+
+**Risk**: A stale cache may misclassify an update as an insert, potentially 
creating duplicate records. This is mitigated by:

Review Comment:
   πŸ€– @nsivabalan could you sanity-check the multi-writer safety here? With each 
writer holding an independent, possibly-stale RocksDB cache, a concurrent 
insert of the same key on two writers could be classified as a new insert on 
both and assigned different file groups β€” would OCC's file-level conflict 
detection actually catch that, or could it slip through and violate RLI key 
uniqueness? The 'freshness validation' mitigation (checking latest cached 
instant vs. latest committed instant) would benefit from more detail on how it 
closes this window.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).

Review Comment:
   πŸ€– The time-bounded bootstrap assumes partition paths are date-based and can 
be parsed into a 'last X days' window. How would this work for non-date 
partitioning (e.g. region/category) or multi-level/composite partitions where 
there's no temporal ordering? It might be worth defining the bootstrap behavior 
(and the `bootstrap.days` semantics) when partitions can't be ordered by time.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.
+3. Once loaded, perform the record lookup against the newly populated column 
family and continue normal processing.
+4. The on-demand loaded partition is subject to the same TTL-based eviction as 
bootstrapped partitions (see [TTL-Based Partition 
Eviction](#ttl-based-partition-eviction)).
+
+This two-tier approach β€” time-bounded bootstrap plus on-demand loading β€” 
ensures that:
+
+- **Bootstrap is fast and predictable**: loading X days of index data is 
bounded and proportional to recent write volume, not total table size. For a 
daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day 
bootstrap loads ~4.2 GB β€” completing in seconds rather than minutes.
+- **Late-arriving data is handled correctly**: updates to partitions older 
than X days (e.g., backfills, corrections, late-arriving events) trigger 
on-demand loading of only the affected partition, avoiding a full re-bootstrap.
+- **Cache growth remains bounded**: combined with TTL eviction, the cache 
holds at most `bootstrap_days + on-demand loaded` partitions, with cold 
partitions automatically evicted.
+
+```
+Bootstrap Timeline (X = 7 days)
+
+◄──── Older partitions ─────◄──── Bootstrap window (7 days) ────►│ Today
+                            β”‚                                     β”‚
+ dt=2025-01-10  dt=2025-01-13  dt=2025-01-15 ... dt=2025-01-21  dt=2025-01-22
+       β”‚              β”‚              β”‚                    β”‚
+   Not loaded     Not loaded     Bootstrapped at       Bootstrapped
+   (load on       (load on       job start             at job start
+    demand if      demand if
+    needed)        needed)
+
+When a record arrives for dt=2025-01-10:
+  1. Column family "dt=2025-01-10" not found in RocksDB
+  2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT
+  3. Create column family, bulk-load entries
+  4. Perform lookup and continue
+```
+
+### Incremental Cache Maintenance
+
+To achieve the on demand RLI load for an older partition, RLIBootstrapOperator 
needs to be revised to access the shared RocksDB instance with BucketAssign 
operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is 
maintained incrementally during normal write operations:
+
+#### On Record Processing
+
+```
+for each incoming record r:
+    1. In RLIBootstrapOperator, look up through shared RocksDB for 
r.partitionPath
+       β†’ If column family does not exist:
+        a. Load Partitioned Record Index for r.partitionPath from MDT 
(on-demand)
+        b. Create column family and bulk-load entries
+       β†’ If found: forward to next operator BucketAssign
+    2. In BucketAssign operator, check RocksDB cache for r.key in column 
family r.partitionPath
+       β†’ If found in RocksDB: use cached location (committed record)
+       β†’ If not found: this is an INSERT, assign new file group
+    3. Update RocksDB cache with r.key β†’ assigned location
+```
+
+In this process above, the PreBucketAssign is responsible for check whether a 
record is from older partitions that is not boostrap. If yes, it will on-demand 
load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator.
+
+#### On Index Write
+
+In the `IndexWrite` operator (from RFC-106), index records are written to MDT. 
The RocksDB cache in the `BucketAssigner` is updated in-line as records flow 
through the pipeline, ensuring the cache stays ahead of MDT commits.
+
+### TTL-Based Partition Eviction
+
+For partitioned RLI, the cache implements automatic eviction of cold 
partitions:
+
+- Each column family tracks the **last access timestamp** (last time a record 
was written to or looked up in that partition).
+- A background thread periodically scans column family metadata and drops 
column families whose last access exceeds the configured TTL.
+- The TTL should be set based on the workload's partition access pattern. For 
daily-partitioned event data, a TTL of 3–7 days is typical.
+
+Configuration:
+
+| Property | Default | Description |
+|---|---|---|
+| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based 
partitioned cache |
+| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local 
directory for RocksDB data |
+| `hoodie.index.cache.rocksdb.bootstrap.days` | `7` | Number of days of 
Partitioned Record Index to load during bootstrap. Only partitions within this 
window are pre-loaded; older partitions are loaded on demand when updates are 
observed. |
+| `hoodie.index.cache.rocksdb.partition.ttl.hours` | `168` (7 days) | TTL for 
partition column families |
+| `hoodie.index.cache.rocksdb.block.cache.mb` | `256` | RocksDB block cache 
size (off-heap) |
+| `hoodie.index.cache.rocksdb.compaction.style` | `LEVEL` | RocksDB compaction 
style |
+
+### Storage Overhead
+
+RocksDB occupies approximately **2x the storage** compared to native HFile 
format in MDT, due to:
+
+1. **Compression codec difference**: RocksDB uses Snappy compression by 
default, while Hudi MDT uses gzip, which achieves higher compression ratios.
+2. **Uncompacted SST files**: During active writes, RocksDB maintains multiple 
levels of SST files before compaction merges them.
+3. **WAL disabled**: Write-ahead log is disabled since the cache can be 
rebuilt from MDT on failure, reducing write amplification.
+
+For a partition with 10 million records at ~60 bytes per entry, the RocksDB 
footprint is approximately 1.2 GB on disk.
+
+### Consistency and Failure Handling
+
+The RocksDB cache is a **derived, disposable replica** β€” MDT remains the 
single source of truth. This simplifies consistency handling:
+
+#### Task Failover
+
+1. The RocksDB cache on the failed task's container is discarded (ephemeral 
local storage).
+2. The recovered task bootstraps a fresh RocksDB instance from MDT.
+4. The coordinator recommits any pending Hudi instants (as described in 
RFC-106).
+
+#### Job Restart
+
+1. All RocksDB state is discarded.
+2. Full bootstrap from MDT is triggered.
+3. The coordinator handles recommitting of pending instants per RFC-106's 
recovery protocol.
+
+#### Concurrent Writers (OCC)
+
+When multiple Flink jobs write to the same Hudi table under Optimistic 
Concurrency Control:
+
+1. Each writer maintains its own independent RocksDB cache.
+2. Caches may become stale relative to other writers' commits.
+3. Conflict detection occurs at commit time via Hudi's OCC protocol.
+4. On conflict detection, the losing writer's RocksDB cache must be refreshed 
by re-reading the conflicting partitions from MDT.
+
+**Risk**: A stale cache may misclassify an update as an insert, potentially 
creating duplicate records. This is mitigated by:
+- Early conflict detection during checkpoint lifecycle (best-effort).
+- Index freshness validation: before writing, check that the cache's latest 
instant matches the table's latest committed instant.
+- Full cache refresh on detected staleness.
+
+### Integration with RFC-106 Pipeline
+
+TBD

Review Comment:
   πŸ€– This section is essentially the crux of how the new operators 
(PreBucketAssign, shared-RocksDB RLIBootstrapOperator) wire into the existing 
RFC-106 topology, but it's currently TBD β€” as is the Approvers field and the In 
Progress status. Since these changes touch the operator graph and shared state, 
it'd help to flesh this section out before a deeper review, as feasibility is 
hard to fully assess without it.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.
+3. Once loaded, perform the record lookup against the newly populated column 
family and continue normal processing.
+4. The on-demand loaded partition is subject to the same TTL-based eviction as 
bootstrapped partitions (see [TTL-Based Partition 
Eviction](#ttl-based-partition-eviction)).
+
+This two-tier approach β€” time-bounded bootstrap plus on-demand loading β€” 
ensures that:
+
+- **Bootstrap is fast and predictable**: loading X days of index data is 
bounded and proportional to recent write volume, not total table size. For a 
daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day 
bootstrap loads ~4.2 GB β€” completing in seconds rather than minutes.
+- **Late-arriving data is handled correctly**: updates to partitions older 
than X days (e.g., backfills, corrections, late-arriving events) trigger 
on-demand loading of only the affected partition, avoiding a full re-bootstrap.
+- **Cache growth remains bounded**: combined with TTL eviction, the cache 
holds at most `bootstrap_days + on-demand loaded` partitions, with cold 
partitions automatically evicted.
+
+```
+Bootstrap Timeline (X = 7 days)
+
+◄──── Older partitions ─────◄──── Bootstrap window (7 days) ────►│ Today
+                            β”‚                                     β”‚
+ dt=2025-01-10  dt=2025-01-13  dt=2025-01-15 ... dt=2025-01-21  dt=2025-01-22
+       β”‚              β”‚              β”‚                    β”‚
+   Not loaded     Not loaded     Bootstrapped at       Bootstrapped
+   (load on       (load on       job start             at job start
+    demand if      demand if
+    needed)        needed)
+
+When a record arrives for dt=2025-01-10:
+  1. Column family "dt=2025-01-10" not found in RocksDB
+  2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT
+  3. Create column family, bulk-load entries
+  4. Perform lookup and continue
+```
+
+### Incremental Cache Maintenance
+
+To achieve the on demand RLI load for an older partition, RLIBootstrapOperator 
needs to be revised to access the shared RocksDB instance with BucketAssign 
operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is 
maintained incrementally during normal write operations:
+
+#### On Record Processing
+
+```
+for each incoming record r:
+    1. In RLIBootstrapOperator, look up through shared RocksDB for 
r.partitionPath

Review Comment:
   πŸ€– The on-demand partition load appears synchronous on the record-processing 
path: a record for an un-cached older partition blocks while that partition's 
entire index is scanned from MDT and bulk-loaded. For a backfill or correction 
touching many cold partitions, could this cause repeated multi-second stalls 
and backpressure across the pipeline? Have you considered async loading, or 
buffering the affected records while the load proceeds?
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.

Review Comment:
   πŸ€– RFC-106 assigns work by hash(record_key) % num_shards, so a given subtask 
owns keys spread across all data partitions. When on-demand loading 'the 
Partitioned Record Index for partition dt=X', does each subtask load only the 
keys in that partition belonging to its shard, or the whole partition (with 
most subtasks loading entries they'll never look up)? And for partitioned RLI, 
what's the keyBy β€” by partition path (which risks skew when one partition is 
hot) or by key? Clarifying the task↔partition↔shard mapping would help.
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
rfc/rfc-107/rfc-107.md:
##########
@@ -0,0 +1,306 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert
+
+## Proposers
+
+- @zhenqiu-huang
+
+## Approvers
+ - TBD
+
+## Status
+ - In Progress
+
+## Abstract
+
+[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support 
for Flink streaming writes, including a simple in-memory cache for index 
lookups in the `BucketAssigner` operator. While the in-memory cache works well 
for small to moderate workloads, it faces scalability challenges for large 
tables with billions of records: the cache either consumes excessive JVM heap 
memory or suffers from high eviction rates that degrade lookup performance.
+In modern CloudLake systems that rely on object storage platforms such as GCS, 
OCI Object Storage, and Amazon S3, data is typically transitioned to lower 
storage tiers over time to optimize storage costs. However, using an in-memory 
cache to accelerate index lookups may result in increased data processing 
overhead.
+
+This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that 
serves as a local materialized replica of the MDT RLI. The cache provides:
+
+- **O(1) local lookups** for record location resolution during streaming 
writes, eliminating per-record MDT I/O
+- **Partition-aware storage** using RocksDB column families, enabling 
efficient TTL-based eviction of stale partitions
+- **Bounded resource consumption** by caching only the partitions actively 
written to, keeping storage proportional to the working set rather than total 
table size
+- **Incremental maintenance** through in-line index updates during the write 
path, with MDT as the authoritative source of truth for bootstrap and 
cross-engine compatibility
+
+## Background
+
+### The Index Lookup Bottleneck
+
+In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine 
whether each incoming record is an insert or an update by looking up its record 
key in the index. RFC-106 introduces an in-memory cache to accelerate these 
lookups, but for large-scale streaming workloads, this approach has fundamental 
limitations:
+
+1. **Unbounded Cost**: Each RLI entry requires approximately 50–70 bytes of 
memory. For a table containing 1 billion records, caching the entire index 
would consume 50–70 GB of JVM heap. In addition, a record buffer is required to 
improve RLI lookup efficiency. For CDC workloads with high event throughput 
(QPS) and large record sizes, maintaining a two-minute buffer can further 
increase memory consumption significantly. As a result, the compute cost of 
upsert ingestion workloads can rise substantially.
+2. **Cache thrashing**: With bounded memory, the cache must evict entries 
aggressively. For workloads that access records across many partitions, this 
leads to frequent cache misses and fallback to MDT queries (10+ ms per record), 
severely degrading throughput.
+3. **Cold start latency**: On job restart or task failover, the in-memory 
cache starts empty. Warming the cache through individual MDT lookups creates a 
prolonged period of degraded performance.
+
+### Why RocksDB
+
+RocksDB is a proven embedded key-value store that addresses these limitations:
+
+- **Off-heap storage**: RocksDB stores data in SST files on local disk with a 
configurable block cache in off-heap memory, avoiding GC pressure on the JVM 
heap.
+- **Column families**: RocksDB supports column families, which provide logical 
separation of data within a single database instance. Each partition's index 
entries can be stored in a dedicated column family, enabling efficient bulk 
operations (e.g., dropping an entire partition's cache) without affecting other 
partitions.
+- **Compression**: RocksDB applies block-level compression (Snappy by 
default), keeping the on-disk footprint manageable.
+- **Mature Flink integration**: Flink already uses RocksDB as its primary 
state backend, so operational expertise and deployment patterns are 
well-established.
+
+## High Level Design
+
+The Dynamic Partitioned Cache introduces a RocksDB-based local index replica 
as an alternative cache of the in-memory uncommitted-records cache introduced 
in RFC-106 for large tables. The lookup order is:
+
+1**RocksDB cache** β€” materialized replica of committed index state
+2**MDT RLI** β€” authoritative remote index (fallback for cache misses)
+
+The RocksDB cache is partitioned by Hudi data partition path using column 
families. This partition-aware design enables:
+
+- Bootstrapping only the partitions that the writer actively touches
+- Evicting cold partitions via TTL without scanning individual keys
+- Bounding cache size to `O(active_partitions)` rather than 
`O(total_table_size)`
+
+A fundamental design is needed to handle cache misses by reading from the MDT 
RLI and storing the loaded index in RocksDB for future lookups. It will be 
discussed in following sections.
+
+### Detailed Design
+
+### Cache Structure and Partitioning
+
+The RocksDB instance is organized using **column families**, one per Hudi data 
partition path. Each column family stores key-value pairs where:
+
+- **Key**: Record key (byte-serialized)
+- **Value**: Record location (file group ID + file slice info) and ordering 
value
+
+Using column families provides two critical advantages over a single flat 
keyspace:
+
+1. **Efficient partition eviction**: Dropping a column family is an O(1) 
metadata operation in RocksDB, compared to O(N) individual deletes.
+2. **Partition-level TTL**: Each column family can have its own TTL 
configuration, enabling automatic eviction of partitions that haven't been 
written to recently.
+
+```
+RocksDB Instance
+β”œβ”€β”€ CF: "default"                    (metadata: partition registry, timestamps)
+β”œβ”€β”€ CF: "dt=2025-01-15"             (RLI entries for partition dt=2025-01-15)
+β”œβ”€β”€ CF: "dt=2025-01-16"             (RLI entries for partition dt=2025-01-16)
+└── CF: "dt=2025-01-17"             (RLI entries for partition dt=2025-01-17)
+```
+
+### Bootstrap Strategy
+
+On job start or task failover, the RocksDB cache must be populated from the 
MDT RLI. The bootstrap strategy differs based on the index scope:
+
+#### Global RLI Bootstrap
+
+For global RLI (cross-partition upsert), the cache must contain all record 
keys across the entire table:
+
+1. Close and discard any existing RocksDB state (container-local storage is 
ephemeral).
+2. Scan the full MDT RLI partition assigned to this task (based on 
`hash(record_key) % num_index_shards` assignment from RFC-106).
+3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion 
performance.
+4. Open the RocksDB instance for read-write access.
+
+**Bootstrap latency**: For a table with 1 billion records and ~50–70 bytes per 
entry, scanning and loading the full index requires approximately 50–70 GB of 
data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost 
is incurred on every job restart.
+
+#### Partitioned RLI Bootstrap (Recommended)
+
+This strategy leverages Hudi's [Partitioned Record 
Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which 
organizes the MDT record-level index by partition path. Unlike the Global 
Record Index, the Partitioned Record Index guarantees uniqueness within each 
`(partition_path, record_key)` pair and supports partition-scoped lookups, 
making it possible to load only a subset of the index during bootstrap.
+
+**Time-bounded bootstrap**: On job start or task failover, the cache loads 
only the most recent **X days** of partitioned record index entries 
(configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds 
bootstrap time and resource consumption to a predictable window:
+
+1. Close and discard any existing RocksDB state.
+2. Determine the bootstrap window: compute the set of partitions whose 
partition path falls within the last X days (e.g., `dt=2025-01-15` through 
`dt=2025-01-21` for a 7-day window).
+3. For each partition in the bootstrap window, scan only its corresponding MDT 
Partitioned Record Index entries and bulk-load them into a dedicated RocksDB 
column family.
+4. Record the bootstrap boundary timestamp (the oldest partition loaded) in 
the `default` column family metadata.
+
+**On-demand loading for older partitions**: During ingestion, when the 
`BucketAssigner` encounters an incoming record whose partition path is **older 
than the bootstrap window** (i.e., not yet cached in RocksDB):
+
+1. Detect the cache miss: the target partition's column family does not exist 
in RocksDB.
+2. Trigger an **on-demand partition load**: read the Partitioned Record Index 
entries for that specific partition from MDT and materialize them into a new 
RocksDB column family.
+3. Once loaded, perform the record lookup against the newly populated column 
family and continue normal processing.
+4. The on-demand loaded partition is subject to the same TTL-based eviction as 
bootstrapped partitions (see [TTL-Based Partition 
Eviction](#ttl-based-partition-eviction)).
+
+This two-tier approach β€” time-bounded bootstrap plus on-demand loading β€” 
ensures that:
+
+- **Bootstrap is fast and predictable**: loading X days of index data is 
bounded and proportional to recent write volume, not total table size. For a 
daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day 
bootstrap loads ~4.2 GB β€” completing in seconds rather than minutes.
+- **Late-arriving data is handled correctly**: updates to partitions older 
than X days (e.g., backfills, corrections, late-arriving events) trigger 
on-demand loading of only the affected partition, avoiding a full re-bootstrap.
+- **Cache growth remains bounded**: combined with TTL eviction, the cache 
holds at most `bootstrap_days + on-demand loaded` partitions, with cold 
partitions automatically evicted.
+
+```
+Bootstrap Timeline (X = 7 days)
+
+◄──── Older partitions ─────◄──── Bootstrap window (7 days) ────►│ Today
+                            β”‚                                     β”‚
+ dt=2025-01-10  dt=2025-01-13  dt=2025-01-15 ... dt=2025-01-21  dt=2025-01-22
+       β”‚              β”‚              β”‚                    β”‚
+   Not loaded     Not loaded     Bootstrapped at       Bootstrapped
+   (load on       (load on       job start             at job start
+    demand if      demand if
+    needed)        needed)
+
+When a record arrives for dt=2025-01-10:
+  1. Column family "dt=2025-01-10" not found in RocksDB
+  2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT
+  3. Create column family, bulk-load entries
+  4. Perform lookup and continue
+```
+
+### Incremental Cache Maintenance
+
+To achieve the on demand RLI load for an older partition, RLIBootstrapOperator 
needs to be revised to access the shared RocksDB instance with BucketAssign 
operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is 
maintained incrementally during normal write operations:
+
+#### On Record Processing
+
+```
+for each incoming record r:
+    1. In RLIBootstrapOperator, look up through shared RocksDB for 
r.partitionPath
+       β†’ If column family does not exist:
+        a. Load Partitioned Record Index for r.partitionPath from MDT 
(on-demand)
+        b. Create column family and bulk-load entries
+       β†’ If found: forward to next operator BucketAssign
+    2. In BucketAssign operator, check RocksDB cache for r.key in column 
family r.partitionPath
+       β†’ If found in RocksDB: use cached location (committed record)
+       β†’ If not found: this is an INSERT, assign new file group
+    3. Update RocksDB cache with r.key β†’ assigned location
+```
+
+In this process above, the PreBucketAssign is responsible for check whether a 
record is from older partitions that is not boostrap. If yes, it will on-demand 
load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator.
+
+#### On Index Write
+
+In the `IndexWrite` operator (from RFC-106), index records are written to MDT. 
The RocksDB cache in the `BucketAssigner` is updated in-line as records flow 
through the pipeline, ensuring the cache stays ahead of MDT commits.
+
+### TTL-Based Partition Eviction
+
+For partitioned RLI, the cache implements automatic eviction of cold 
partitions:
+
+- Each column family tracks the **last access timestamp** (last time a record 
was written to or looked up in that partition).
+- A background thread periodically scans column family metadata and drops 
column families whose last access exceeds the configured TTL.
+- The TTL should be set based on the workload's partition access pattern. For 
daily-partitioned event data, a TTL of 3–7 days is typical.
+
+Configuration:
+
+| Property | Default | Description |
+|---|---|---|
+| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based 
partitioned cache |
+| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local 
directory for RocksDB data |

Review Comment:
   πŸ€– RocksDB here needs substantial local disk β€” up to ~50–70 GB for a 
global-RLI bootstrap of a 1B-row table, per the document's own sizing β€” which 
many containerized/k8s TaskManager setups don't provision by default, and the 
`/tmp` default is risky in production (eviction, noexec, sharing with the OS). 
Could you add guidance on local-disk sizing requirements and pick a more 
production-appropriate default path (or derive it from Flink's configured 
local/tmp dirs)?
   
   <sub><i>⚠️ AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to