hudi-agent commented on code in PR #19046: URL: https://github.com/apache/hudi/pull/19046#discussion_r3457456092
########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: Review Comment: π€ The lookup order here is RocksDB β MDT, but both hold only committed state, whereas RFC-106's in-memory cache also tracks uncommitted records from the in-flight checkpoint. Does the RocksDB cache replace that uncommitted buffer or sit alongside it? If a record updates a key that was inserted earlier in the same not-yet-committed checkpoint, how do we avoid misclassifying it as an insert and creating a duplicate? <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. +3. Once loaded, perform the record lookup against the newly populated column family and continue normal processing. +4. The on-demand loaded partition is subject to the same TTL-based eviction as bootstrapped partitions (see [TTL-Based Partition Eviction](#ttl-based-partition-eviction)). + +This two-tier approach β time-bounded bootstrap plus on-demand loading β ensures that: + +- **Bootstrap is fast and predictable**: loading X days of index data is bounded and proportional to recent write volume, not total table size. For a daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day bootstrap loads ~4.2 GB β completing in seconds rather than minutes. +- **Late-arriving data is handled correctly**: updates to partitions older than X days (e.g., backfills, corrections, late-arriving events) trigger on-demand loading of only the affected partition, avoiding a full re-bootstrap. +- **Cache growth remains bounded**: combined with TTL eviction, the cache holds at most `bootstrap_days + on-demand loaded` partitions, with cold partitions automatically evicted. + +``` +Bootstrap Timeline (X = 7 days) + +βββββ Older partitions βββββ€βββββ Bootstrap window (7 days) βββββΊβ Today + β β + dt=2025-01-10 dt=2025-01-13 dt=2025-01-15 ... dt=2025-01-21 dt=2025-01-22 + β β β β + Not loaded Not loaded Bootstrapped at Bootstrapped + (load on (load on job start at job start + demand if demand if + needed) needed) + +When a record arrives for dt=2025-01-10: + 1. Column family "dt=2025-01-10" not found in RocksDB + 2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT + 3. Create column family, bulk-load entries + 4. Perform lookup and continue +``` + +### Incremental Cache Maintenance + +To achieve the on demand RLI load for an older partition, RLIBootstrapOperator needs to be revised to access the shared RocksDB instance with BucketAssign operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is maintained incrementally during normal write operations: Review Comment: π€ @danny0405 sharing a single RocksDB instance between RLIBootstrapOperator and BucketAssign implies the two operators are co-located in the same JVM/task slot. If there's a keyBy/shuffle between them they'd run in separate tasks (possibly separate TMs) and couldn't share the instance. Could you clarify how co-location is guaranteed (operator chaining?), and how concurrent access from the two operators to the shared instance is made thread-safe? <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. Review Comment: π€ Since Flink already runs RocksDB as its state backend, have you considered using Flink managed keyed state (e.g. MapState with state TTL) instead of a separately-managed RocksDB instance? That would provide incremental checkpointing and recovery for free β surviving restarts without the full re-bootstrap from MDT β and would sidestep manual lifecycle/discard handling. It'd be worth an Alternatives Considered section laying out why a standalone instance was chosen over managed state. <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. +3. Once loaded, perform the record lookup against the newly populated column family and continue normal processing. +4. The on-demand loaded partition is subject to the same TTL-based eviction as bootstrapped partitions (see [TTL-Based Partition Eviction](#ttl-based-partition-eviction)). + +This two-tier approach β time-bounded bootstrap plus on-demand loading β ensures that: + +- **Bootstrap is fast and predictable**: loading X days of index data is bounded and proportional to recent write volume, not total table size. For a daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day bootstrap loads ~4.2 GB β completing in seconds rather than minutes. +- **Late-arriving data is handled correctly**: updates to partitions older than X days (e.g., backfills, corrections, late-arriving events) trigger on-demand loading of only the affected partition, avoiding a full re-bootstrap. +- **Cache growth remains bounded**: combined with TTL eviction, the cache holds at most `bootstrap_days + on-demand loaded` partitions, with cold partitions automatically evicted. + +``` +Bootstrap Timeline (X = 7 days) + +βββββ Older partitions βββββ€βββββ Bootstrap window (7 days) βββββΊβ Today + β β + dt=2025-01-10 dt=2025-01-13 dt=2025-01-15 ... dt=2025-01-21 dt=2025-01-22 + β β β β + Not loaded Not loaded Bootstrapped at Bootstrapped + (load on (load on job start at job start + demand if demand if + needed) needed) + +When a record arrives for dt=2025-01-10: + 1. Column family "dt=2025-01-10" not found in RocksDB + 2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT + 3. Create column family, bulk-load entries + 4. Perform lookup and continue +``` + +### Incremental Cache Maintenance + +To achieve the on demand RLI load for an older partition, RLIBootstrapOperator needs to be revised to access the shared RocksDB instance with BucketAssign operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is maintained incrementally during normal write operations: + +#### On Record Processing + +``` +for each incoming record r: + 1. In RLIBootstrapOperator, look up through shared RocksDB for r.partitionPath + β If column family does not exist: + a. Load Partitioned Record Index for r.partitionPath from MDT (on-demand) + b. Create column family and bulk-load entries + β If found: forward to next operator BucketAssign + 2. In BucketAssign operator, check RocksDB cache for r.key in column family r.partitionPath + β If found in RocksDB: use cached location (committed record) + β If not found: this is an INSERT, assign new file group + 3. Update RocksDB cache with r.key β assigned location +``` + +In this process above, the PreBucketAssign is responsible for check whether a record is from older partitions that is not boostrap. If yes, it will on-demand load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator. + +#### On Index Write + +In the `IndexWrite` operator (from RFC-106), index records are written to MDT. The RocksDB cache in the `BucketAssigner` is updated in-line as records flow through the pipeline, ensuring the cache stays ahead of MDT commits. + +### TTL-Based Partition Eviction + +For partitioned RLI, the cache implements automatic eviction of cold partitions: + +- Each column family tracks the **last access timestamp** (last time a record was written to or looked up in that partition). +- A background thread periodically scans column family metadata and drops column families whose last access exceeds the configured TTL. +- The TTL should be set based on the workload's partition access pattern. For daily-partitioned event data, a TTL of 3β7 days is typical. + +Configuration: + +| Property | Default | Description | +|---|---|---| +| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based partitioned cache | +| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local directory for RocksDB data | +| `hoodie.index.cache.rocksdb.bootstrap.days` | `7` | Number of days of Partitioned Record Index to load during bootstrap. Only partitions within this window are pre-loaded; older partitions are loaded on demand when updates are observed. | +| `hoodie.index.cache.rocksdb.partition.ttl.hours` | `168` (7 days) | TTL for partition column families | +| `hoodie.index.cache.rocksdb.block.cache.mb` | `256` | RocksDB block cache size (off-heap) | +| `hoodie.index.cache.rocksdb.compaction.style` | `LEVEL` | RocksDB compaction style | + +### Storage Overhead + +RocksDB occupies approximately **2x the storage** compared to native HFile format in MDT, due to: + +1. **Compression codec difference**: RocksDB uses Snappy compression by default, while Hudi MDT uses gzip, which achieves higher compression ratios. +2. **Uncompacted SST files**: During active writes, RocksDB maintains multiple levels of SST files before compaction merges them. +3. **WAL disabled**: Write-ahead log is disabled since the cache can be rebuilt from MDT on failure, reducing write amplification. + +For a partition with 10 million records at ~60 bytes per entry, the RocksDB footprint is approximately 1.2 GB on disk. + +### Consistency and Failure Handling + +The RocksDB cache is a **derived, disposable replica** β MDT remains the single source of truth. This simplifies consistency handling: + +#### Task Failover + +1. The RocksDB cache on the failed task's container is discarded (ephemeral local storage). +2. The recovered task bootstraps a fresh RocksDB instance from MDT. +4. The coordinator recommits any pending Hudi instants (as described in RFC-106). + +#### Job Restart + +1. All RocksDB state is discarded. +2. Full bootstrap from MDT is triggered. +3. The coordinator handles recommitting of pending instants per RFC-106's recovery protocol. + +#### Concurrent Writers (OCC) + +When multiple Flink jobs write to the same Hudi table under Optimistic Concurrency Control: + +1. Each writer maintains its own independent RocksDB cache. +2. Caches may become stale relative to other writers' commits. +3. Conflict detection occurs at commit time via Hudi's OCC protocol. +4. On conflict detection, the losing writer's RocksDB cache must be refreshed by re-reading the conflicting partitions from MDT. + +**Risk**: A stale cache may misclassify an update as an insert, potentially creating duplicate records. This is mitigated by: Review Comment: π€ @nsivabalan could you sanity-check the multi-writer safety here? With each writer holding an independent, possibly-stale RocksDB cache, a concurrent insert of the same key on two writers could be classified as a new insert on both and assigned different file groups β would OCC's file-level conflict detection actually catch that, or could it slip through and violate RLI key uniqueness? The 'freshness validation' mitigation (checking latest cached instant vs. latest committed instant) would benefit from more detail on how it closes this window. <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). Review Comment: π€ The time-bounded bootstrap assumes partition paths are date-based and can be parsed into a 'last X days' window. How would this work for non-date partitioning (e.g. region/category) or multi-level/composite partitions where there's no temporal ordering? It might be worth defining the bootstrap behavior (and the `bootstrap.days` semantics) when partitions can't be ordered by time. <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. +3. Once loaded, perform the record lookup against the newly populated column family and continue normal processing. +4. The on-demand loaded partition is subject to the same TTL-based eviction as bootstrapped partitions (see [TTL-Based Partition Eviction](#ttl-based-partition-eviction)). + +This two-tier approach β time-bounded bootstrap plus on-demand loading β ensures that: + +- **Bootstrap is fast and predictable**: loading X days of index data is bounded and proportional to recent write volume, not total table size. For a daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day bootstrap loads ~4.2 GB β completing in seconds rather than minutes. +- **Late-arriving data is handled correctly**: updates to partitions older than X days (e.g., backfills, corrections, late-arriving events) trigger on-demand loading of only the affected partition, avoiding a full re-bootstrap. +- **Cache growth remains bounded**: combined with TTL eviction, the cache holds at most `bootstrap_days + on-demand loaded` partitions, with cold partitions automatically evicted. + +``` +Bootstrap Timeline (X = 7 days) + +βββββ Older partitions βββββ€βββββ Bootstrap window (7 days) βββββΊβ Today + β β + dt=2025-01-10 dt=2025-01-13 dt=2025-01-15 ... dt=2025-01-21 dt=2025-01-22 + β β β β + Not loaded Not loaded Bootstrapped at Bootstrapped + (load on (load on job start at job start + demand if demand if + needed) needed) + +When a record arrives for dt=2025-01-10: + 1. Column family "dt=2025-01-10" not found in RocksDB + 2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT + 3. Create column family, bulk-load entries + 4. Perform lookup and continue +``` + +### Incremental Cache Maintenance + +To achieve the on demand RLI load for an older partition, RLIBootstrapOperator needs to be revised to access the shared RocksDB instance with BucketAssign operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is maintained incrementally during normal write operations: + +#### On Record Processing + +``` +for each incoming record r: + 1. In RLIBootstrapOperator, look up through shared RocksDB for r.partitionPath + β If column family does not exist: + a. Load Partitioned Record Index for r.partitionPath from MDT (on-demand) + b. Create column family and bulk-load entries + β If found: forward to next operator BucketAssign + 2. In BucketAssign operator, check RocksDB cache for r.key in column family r.partitionPath + β If found in RocksDB: use cached location (committed record) + β If not found: this is an INSERT, assign new file group + 3. Update RocksDB cache with r.key β assigned location +``` + +In this process above, the PreBucketAssign is responsible for check whether a record is from older partitions that is not boostrap. If yes, it will on-demand load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator. + +#### On Index Write + +In the `IndexWrite` operator (from RFC-106), index records are written to MDT. The RocksDB cache in the `BucketAssigner` is updated in-line as records flow through the pipeline, ensuring the cache stays ahead of MDT commits. + +### TTL-Based Partition Eviction + +For partitioned RLI, the cache implements automatic eviction of cold partitions: + +- Each column family tracks the **last access timestamp** (last time a record was written to or looked up in that partition). +- A background thread periodically scans column family metadata and drops column families whose last access exceeds the configured TTL. +- The TTL should be set based on the workload's partition access pattern. For daily-partitioned event data, a TTL of 3β7 days is typical. + +Configuration: + +| Property | Default | Description | +|---|---|---| +| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based partitioned cache | +| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local directory for RocksDB data | +| `hoodie.index.cache.rocksdb.bootstrap.days` | `7` | Number of days of Partitioned Record Index to load during bootstrap. Only partitions within this window are pre-loaded; older partitions are loaded on demand when updates are observed. | +| `hoodie.index.cache.rocksdb.partition.ttl.hours` | `168` (7 days) | TTL for partition column families | +| `hoodie.index.cache.rocksdb.block.cache.mb` | `256` | RocksDB block cache size (off-heap) | +| `hoodie.index.cache.rocksdb.compaction.style` | `LEVEL` | RocksDB compaction style | + +### Storage Overhead + +RocksDB occupies approximately **2x the storage** compared to native HFile format in MDT, due to: + +1. **Compression codec difference**: RocksDB uses Snappy compression by default, while Hudi MDT uses gzip, which achieves higher compression ratios. +2. **Uncompacted SST files**: During active writes, RocksDB maintains multiple levels of SST files before compaction merges them. +3. **WAL disabled**: Write-ahead log is disabled since the cache can be rebuilt from MDT on failure, reducing write amplification. + +For a partition with 10 million records at ~60 bytes per entry, the RocksDB footprint is approximately 1.2 GB on disk. + +### Consistency and Failure Handling + +The RocksDB cache is a **derived, disposable replica** β MDT remains the single source of truth. This simplifies consistency handling: + +#### Task Failover + +1. The RocksDB cache on the failed task's container is discarded (ephemeral local storage). +2. The recovered task bootstraps a fresh RocksDB instance from MDT. +4. The coordinator recommits any pending Hudi instants (as described in RFC-106). + +#### Job Restart + +1. All RocksDB state is discarded. +2. Full bootstrap from MDT is triggered. +3. The coordinator handles recommitting of pending instants per RFC-106's recovery protocol. + +#### Concurrent Writers (OCC) + +When multiple Flink jobs write to the same Hudi table under Optimistic Concurrency Control: + +1. Each writer maintains its own independent RocksDB cache. +2. Caches may become stale relative to other writers' commits. +3. Conflict detection occurs at commit time via Hudi's OCC protocol. +4. On conflict detection, the losing writer's RocksDB cache must be refreshed by re-reading the conflicting partitions from MDT. + +**Risk**: A stale cache may misclassify an update as an insert, potentially creating duplicate records. This is mitigated by: +- Early conflict detection during checkpoint lifecycle (best-effort). +- Index freshness validation: before writing, check that the cache's latest instant matches the table's latest committed instant. +- Full cache refresh on detected staleness. + +### Integration with RFC-106 Pipeline + +TBD Review Comment: π€ This section is essentially the crux of how the new operators (PreBucketAssign, shared-RocksDB RLIBootstrapOperator) wire into the existing RFC-106 topology, but it's currently TBD β as is the Approvers field and the In Progress status. Since these changes touch the operator graph and shared state, it'd help to flesh this section out before a deeper review, as feasibility is hard to fully assess without it. <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. +3. Once loaded, perform the record lookup against the newly populated column family and continue normal processing. +4. The on-demand loaded partition is subject to the same TTL-based eviction as bootstrapped partitions (see [TTL-Based Partition Eviction](#ttl-based-partition-eviction)). + +This two-tier approach β time-bounded bootstrap plus on-demand loading β ensures that: + +- **Bootstrap is fast and predictable**: loading X days of index data is bounded and proportional to recent write volume, not total table size. For a daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day bootstrap loads ~4.2 GB β completing in seconds rather than minutes. +- **Late-arriving data is handled correctly**: updates to partitions older than X days (e.g., backfills, corrections, late-arriving events) trigger on-demand loading of only the affected partition, avoiding a full re-bootstrap. +- **Cache growth remains bounded**: combined with TTL eviction, the cache holds at most `bootstrap_days + on-demand loaded` partitions, with cold partitions automatically evicted. + +``` +Bootstrap Timeline (X = 7 days) + +βββββ Older partitions βββββ€βββββ Bootstrap window (7 days) βββββΊβ Today + β β + dt=2025-01-10 dt=2025-01-13 dt=2025-01-15 ... dt=2025-01-21 dt=2025-01-22 + β β β β + Not loaded Not loaded Bootstrapped at Bootstrapped + (load on (load on job start at job start + demand if demand if + needed) needed) + +When a record arrives for dt=2025-01-10: + 1. Column family "dt=2025-01-10" not found in RocksDB + 2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT + 3. Create column family, bulk-load entries + 4. Perform lookup and continue +``` + +### Incremental Cache Maintenance + +To achieve the on demand RLI load for an older partition, RLIBootstrapOperator needs to be revised to access the shared RocksDB instance with BucketAssign operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is maintained incrementally during normal write operations: + +#### On Record Processing + +``` +for each incoming record r: + 1. In RLIBootstrapOperator, look up through shared RocksDB for r.partitionPath Review Comment: π€ The on-demand partition load appears synchronous on the record-processing path: a record for an un-cached older partition blocks while that partition's entire index is scanned from MDT and bulk-loaded. For a backfill or correction touching many cold partitions, could this cause repeated multi-second stalls and backpressure across the pipeline? Have you considered async loading, or buffering the affected records while the load proceeds? <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. Review Comment: π€ RFC-106 assigns work by hash(record_key) % num_shards, so a given subtask owns keys spread across all data partitions. When on-demand loading 'the Partitioned Record Index for partition dt=X', does each subtask load only the keys in that partition belonging to its shard, or the whole partition (with most subtasks loading entries they'll never look up)? And for partitioned RLI, what's the keyBy β by partition path (which risks skew when one partition is hot) or by key? Clarifying the taskβpartitionβshard mapping would help. <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## rfc/rfc-107/rfc-107.md: ########## @@ -0,0 +1,306 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-107: Dynamic Partitioned Cache for Flink Hudi Upsert + +## Proposers + +- @zhenqiu-huang + +## Approvers + - TBD + +## Status + - In Progress + +## Abstract + +[RFC-106](../rfc-106/rfc-106.md) introduces Record Level Index (RLI) support for Flink streaming writes, including a simple in-memory cache for index lookups in the `BucketAssigner` operator. While the in-memory cache works well for small to moderate workloads, it faces scalability challenges for large tables with billions of records: the cache either consumes excessive JVM heap memory or suffers from high eviction rates that degrade lookup performance. +In modern CloudLake systems that rely on object storage platforms such as GCS, OCI Object Storage, and Amazon S3, data is typically transitioned to lower storage tiers over time to optimize storage costs. However, using an in-memory cache to accelerate index lookups may result in increased data processing overhead. + +This RFC proposes a **Dynamic Partitioned Cache** backed by RocksDB that serves as a local materialized replica of the MDT RLI. The cache provides: + +- **O(1) local lookups** for record location resolution during streaming writes, eliminating per-record MDT I/O +- **Partition-aware storage** using RocksDB column families, enabling efficient TTL-based eviction of stale partitions +- **Bounded resource consumption** by caching only the partitions actively written to, keeping storage proportional to the working set rather than total table size +- **Incremental maintenance** through in-line index updates during the write path, with MDT as the authoritative source of truth for bootstrap and cross-engine compatibility + +## Background + +### The Index Lookup Bottleneck + +In Hudi's Flink upsert pipeline, the `BucketAssigner` operator must determine whether each incoming record is an insert or an update by looking up its record key in the index. RFC-106 introduces an in-memory cache to accelerate these lookups, but for large-scale streaming workloads, this approach has fundamental limitations: + +1. **Unbounded Cost**: Each RLI entry requires approximately 50β70 bytes of memory. For a table containing 1 billion records, caching the entire index would consume 50β70 GB of JVM heap. In addition, a record buffer is required to improve RLI lookup efficiency. For CDC workloads with high event throughput (QPS) and large record sizes, maintaining a two-minute buffer can further increase memory consumption significantly. As a result, the compute cost of upsert ingestion workloads can rise substantially. +2. **Cache thrashing**: With bounded memory, the cache must evict entries aggressively. For workloads that access records across many partitions, this leads to frequent cache misses and fallback to MDT queries (10+ ms per record), severely degrading throughput. +3. **Cold start latency**: On job restart or task failover, the in-memory cache starts empty. Warming the cache through individual MDT lookups creates a prolonged period of degraded performance. + +### Why RocksDB + +RocksDB is a proven embedded key-value store that addresses these limitations: + +- **Off-heap storage**: RocksDB stores data in SST files on local disk with a configurable block cache in off-heap memory, avoiding GC pressure on the JVM heap. +- **Column families**: RocksDB supports column families, which provide logical separation of data within a single database instance. Each partition's index entries can be stored in a dedicated column family, enabling efficient bulk operations (e.g., dropping an entire partition's cache) without affecting other partitions. +- **Compression**: RocksDB applies block-level compression (Snappy by default), keeping the on-disk footprint manageable. +- **Mature Flink integration**: Flink already uses RocksDB as its primary state backend, so operational expertise and deployment patterns are well-established. + +## High Level Design + +The Dynamic Partitioned Cache introduces a RocksDB-based local index replica as an alternative cache of the in-memory uncommitted-records cache introduced in RFC-106 for large tables. The lookup order is: + +1**RocksDB cache** β materialized replica of committed index state +2**MDT RLI** β authoritative remote index (fallback for cache misses) + +The RocksDB cache is partitioned by Hudi data partition path using column families. This partition-aware design enables: + +- Bootstrapping only the partitions that the writer actively touches +- Evicting cold partitions via TTL without scanning individual keys +- Bounding cache size to `O(active_partitions)` rather than `O(total_table_size)` + +A fundamental design is needed to handle cache misses by reading from the MDT RLI and storing the loaded index in RocksDB for future lookups. It will be discussed in following sections. + +### Detailed Design + +### Cache Structure and Partitioning + +The RocksDB instance is organized using **column families**, one per Hudi data partition path. Each column family stores key-value pairs where: + +- **Key**: Record key (byte-serialized) +- **Value**: Record location (file group ID + file slice info) and ordering value + +Using column families provides two critical advantages over a single flat keyspace: + +1. **Efficient partition eviction**: Dropping a column family is an O(1) metadata operation in RocksDB, compared to O(N) individual deletes. +2. **Partition-level TTL**: Each column family can have its own TTL configuration, enabling automatic eviction of partitions that haven't been written to recently. + +``` +RocksDB Instance +βββ CF: "default" (metadata: partition registry, timestamps) +βββ CF: "dt=2025-01-15" (RLI entries for partition dt=2025-01-15) +βββ CF: "dt=2025-01-16" (RLI entries for partition dt=2025-01-16) +βββ CF: "dt=2025-01-17" (RLI entries for partition dt=2025-01-17) +``` + +### Bootstrap Strategy + +On job start or task failover, the RocksDB cache must be populated from the MDT RLI. The bootstrap strategy differs based on the index scope: + +#### Global RLI Bootstrap + +For global RLI (cross-partition upsert), the cache must contain all record keys across the entire table: + +1. Close and discard any existing RocksDB state (container-local storage is ephemeral). +2. Scan the full MDT RLI partition assigned to this task (based on `hash(record_key) % num_index_shards` assignment from RFC-106). +3. Bulk-load entries into RocksDB using `SSTFileWriter` for optimal ingestion performance. +4. Open the RocksDB instance for read-write access. + +**Bootstrap latency**: For a table with 1 billion records and ~50β70 bytes per entry, scanning and loading the full index requires approximately 50β70 GB of data transfer. At 500 MB/s network throughput, this takes ~2 minutes. This cost is incurred on every job restart. + +#### Partitioned RLI Bootstrap (Recommended) + +This strategy leverages Hudi's [Partitioned Record Index](https://hudi.apache.org/docs/indexes/) (introduced in 1.1.0), which organizes the MDT record-level index by partition path. Unlike the Global Record Index, the Partitioned Record Index guarantees uniqueness within each `(partition_path, record_key)` pair and supports partition-scoped lookups, making it possible to load only a subset of the index during bootstrap. + +**Time-bounded bootstrap**: On job start or task failover, the cache loads only the most recent **X days** of partitioned record index entries (configurable via `hoodie.index.cache.rocksdb.bootstrap.days`). This bounds bootstrap time and resource consumption to a predictable window: + +1. Close and discard any existing RocksDB state. +2. Determine the bootstrap window: compute the set of partitions whose partition path falls within the last X days (e.g., `dt=2025-01-15` through `dt=2025-01-21` for a 7-day window). +3. For each partition in the bootstrap window, scan only its corresponding MDT Partitioned Record Index entries and bulk-load them into a dedicated RocksDB column family. +4. Record the bootstrap boundary timestamp (the oldest partition loaded) in the `default` column family metadata. + +**On-demand loading for older partitions**: During ingestion, when the `BucketAssigner` encounters an incoming record whose partition path is **older than the bootstrap window** (i.e., not yet cached in RocksDB): + +1. Detect the cache miss: the target partition's column family does not exist in RocksDB. +2. Trigger an **on-demand partition load**: read the Partitioned Record Index entries for that specific partition from MDT and materialize them into a new RocksDB column family. +3. Once loaded, perform the record lookup against the newly populated column family and continue normal processing. +4. The on-demand loaded partition is subject to the same TTL-based eviction as bootstrapped partitions (see [TTL-Based Partition Eviction](#ttl-based-partition-eviction)). + +This two-tier approach β time-bounded bootstrap plus on-demand loading β ensures that: + +- **Bootstrap is fast and predictable**: loading X days of index data is bounded and proportional to recent write volume, not total table size. For a daily-partitioned table with 10M records/day at ~60 bytes/entry, a 7-day bootstrap loads ~4.2 GB β completing in seconds rather than minutes. +- **Late-arriving data is handled correctly**: updates to partitions older than X days (e.g., backfills, corrections, late-arriving events) trigger on-demand loading of only the affected partition, avoiding a full re-bootstrap. +- **Cache growth remains bounded**: combined with TTL eviction, the cache holds at most `bootstrap_days + on-demand loaded` partitions, with cold partitions automatically evicted. + +``` +Bootstrap Timeline (X = 7 days) + +βββββ Older partitions βββββ€βββββ Bootstrap window (7 days) βββββΊβ Today + β β + dt=2025-01-10 dt=2025-01-13 dt=2025-01-15 ... dt=2025-01-21 dt=2025-01-22 + β β β β + Not loaded Not loaded Bootstrapped at Bootstrapped + (load on (load on job start at job start + demand if demand if + needed) needed) + +When a record arrives for dt=2025-01-10: + 1. Column family "dt=2025-01-10" not found in RocksDB + 2. On-demand load: read Partitioned Record Index for dt=2025-01-10 from MDT + 3. Create column family, bulk-load entries + 4. Perform lookup and continue +``` + +### Incremental Cache Maintenance + +To achieve the on demand RLI load for an older partition, RLIBootstrapOperator needs to be revised to access the shared RocksDB instance with BucketAssign operator. After bootstrap of RLIBootstrapOperator, the RocksDB cache is maintained incrementally during normal write operations: + +#### On Record Processing + +``` +for each incoming record r: + 1. In RLIBootstrapOperator, look up through shared RocksDB for r.partitionPath + β If column family does not exist: + a. Load Partitioned Record Index for r.partitionPath from MDT (on-demand) + b. Create column family and bulk-load entries + β If found: forward to next operator BucketAssign + 2. In BucketAssign operator, check RocksDB cache for r.key in column family r.partitionPath + β If found in RocksDB: use cached location (committed record) + β If not found: this is an INSERT, assign new file group + 3. Update RocksDB cache with r.key β assigned location +``` + +In this process above, the PreBucketAssign is responsible for check whether a record is from older partitions that is not boostrap. If yes, it will on-demand load from MDT and forward HoodieFlinkInternalRow to the BucketAssign operator. + +#### On Index Write + +In the `IndexWrite` operator (from RFC-106), index records are written to MDT. The RocksDB cache in the `BucketAssigner` is updated in-line as records flow through the pipeline, ensuring the cache stays ahead of MDT commits. + +### TTL-Based Partition Eviction + +For partitioned RLI, the cache implements automatic eviction of cold partitions: + +- Each column family tracks the **last access timestamp** (last time a record was written to or looked up in that partition). +- A background thread periodically scans column family metadata and drops column families whose last access exceeds the configured TTL. +- The TTL should be set based on the workload's partition access pattern. For daily-partitioned event data, a TTL of 3β7 days is typical. + +Configuration: + +| Property | Default | Description | +|---|---|---| +| `hoodie.index.cache.rocksdb.enabled` | `false` | Enable RocksDB-based partitioned cache | +| `hoodie.index.cache.rocksdb.base.path` | `/tmp/hudi-index-cache` | Local directory for RocksDB data | Review Comment: π€ RocksDB here needs substantial local disk β up to ~50β70 GB for a global-RLI bootstrap of a 1B-row table, per the document's own sizing β which many containerized/k8s TaskManager setups don't provision by default, and the `/tmp` default is risky in production (eviction, noexec, sharing with the OS). Could you add guidance on local-disk sizing requirements and pick a more production-appropriate default path (or derive it from Flink's configured local/tmp dirs)? <sub><i>β οΈ AI-generated; verify before applying. React π/π to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
