vinothchandar commented on code in PR #17610:
URL: https://github.com/apache/hudi/pull/17610#discussion_r2738187950


##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 

Review Comment:
   I don't think we should add a record level cache, instead we should optimize 
the caching of the RLI base and log files. My rationale is : the keys uniformly 
distributed to RLI shards and then sorted by key within them. So I expect the 
file level cache will perform equally.. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.
+   6. During the gap between steps 4 and 5, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+4. **Task failover after acknowledgment but before commit completion**: A task 
fails over after the acknowledgment is received but before the instant commit 
completes:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. A task fails and recovers from `ckp_n`.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+   6. Step 3 eventually completes and the instant is committed.
+   7. During the gap between steps 4 and 6, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+
+For data table (DT) metadata, the pipeline will recommit the instant using the 
recovered table metadata. However, since the `BucketAssigner` operator is 
upstream of the `StreamWrite` operator, there is a time gap before these 
inflight instants can be recommitted. 
+We do not want to block `BucketAssigner` processing while waiting for inflight 
instants to be recommitted. 

Review Comment:
   Need to consider concurrent writers .. I generally want to make sane safe 
choices, than over-optimizing for recovery scenarios.. 
   
   So we need to still fully agree on - whether reading uncommitted instants is 
strictly needed. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.
+   6. During the gap between steps 4 and 5, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+4. **Task failover after acknowledgment but before commit completion**: A task 
fails over after the acknowledgment is received but before the instant commit 
completes:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. A task fails and recovers from `ckp_n`.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+   6. Step 3 eventually completes and the instant is committed.
+   7. During the gap between steps 4 and 6, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+
+For data table (DT) metadata, the pipeline will recommit the instant using the 
recovered table metadata. However, since the `BucketAssigner` operator is 
upstream of the `StreamWrite` operator, there is a time gap before these 
inflight instants can be recommitted. 
+We do not want to block `BucketAssigner` processing while waiting for inflight 
instants to be recommitted. 
+
+The proposed solution is to include these special inflight instants in index 
access queries—essentially, we need to support reading inflight instants from 
the MDT. 
+Only inflight instants whose corresponding Flink checkpoint has succeeded are 
included. Inflight instants without a successful checkpoint are excluded. See 
the Appendix for more details on job/task failover handling.
+
+#### Add Event Time Ordering Value for RLI Payload
+For cross-partition updates or deletes, we can not update the RLI directly 
based on the existing key-location mappings. Currently, the RLI payload only 
has the key to location mappings without actual ordering value.
+we need to merge the data records to see if the incoming record is a valid 
update or delete(for valid, it means greater ordering value), that is the 
behavior for Spark RLI write path, but it is too costly for streaming.
+
+For e.g, for two records `r1:{key: k1, orderingValue: 2, partition: par1}` and 
`r2:{key: k1, orderingValue: 1, partition: par2}`, `r2` comes behind `r1` in a 
different commit, 
+comparison of just key existence is not enough, we need to also compare the 
ordering value to see that `r2` is not a valid update and 
+not sending the retraction record(delete record) into partition `par1` for 
payload delete.
+
+The suggested solution is to store the ordering value into the RLI payload, so 
that we can compare the ordering value when there is a match of exiting key 
lookup, to make the decision whether the incoming record is a valid

Review Comment:
   Can't we read the RLI shard via FG reader and have that handle all this. for 
e.g it should merge the delete records correctly. idk why any special handling 
is necessary here. What is the ordering value being proposed - same as the one 
configured for table. ?



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:

Review Comment:
   why? I guess answering the comment above can help understand this. don't we 
complete the commit as a part of completing the checkpoint ?



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.
+   6. During the gap between steps 4 and 5, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+4. **Task failover after acknowledgment but before commit completion**: A task 
fails over after the acknowledgment is received but before the instant commit 
completes:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. A task fails and recovers from `ckp_n`.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+   6. Step 3 eventually completes and the instant is committed.
+   7. During the gap between steps 4 and 6, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+
+For data table (DT) metadata, the pipeline will recommit the instant using the 
recovered table metadata. However, since the `BucketAssigner` operator is 
upstream of the `StreamWrite` operator, there is a time gap before these 
inflight instants can be recommitted. 
+We do not want to block `BucketAssigner` processing while waiting for inflight 
instants to be recommitted. 
+
+The proposed solution is to include these special inflight instants in index 
access queries—essentially, we need to support reading inflight instants from 
the MDT. 
+Only inflight instants whose corresponding Flink checkpoint has succeeded are 
included. Inflight instants without a successful checkpoint are excluded. See 
the Appendix for more details on job/task failover handling.
+
+#### Add Event Time Ordering Value for RLI Payload
+For cross-partition updates or deletes, we can not update the RLI directly 
based on the existing key-location mappings. Currently, the RLI payload only 
has the key to location mappings without actual ordering value.
+we need to merge the data records to see if the incoming record is a valid 
update or delete(for valid, it means greater ordering value), that is the 
behavior for Spark RLI write path, but it is too costly for streaming.

Review Comment:
   >  but it is too costly for streaming.
   
   this is all vague. Please produce microbenchmarks or back of envelope math 
for these. I don't want to over optimize without clear perf goals.



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 

Review Comment:
   mappings cache conceptually should be layered on top. and agree to keep this 
record level... i,e record_key => location. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 

Review Comment:
   who sends the acknowledgement to who? executors to the co-ordinator? are 
there many messages or just the one acknowledgement.
   
   can you please clarify, when exactly we perform these events. 
   
   - writing the `FILES` MDT partition
   - completing the commit on MDT
   - completing the commit on data table. 
   
   are n't these done during checkpoint write? 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.

Review Comment:
   Are you saying -- this cache will be long-lived across commits? if so, can 
we describe how we will update the cached RLI shard by checking for new log 
files written. Remember that those can be written by a concurrent writer, so we 
cannot simply rely on the index mappings cache to keep the cache consistent.. 



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:

Review Comment:
   just confirming, that you are still talking about the mappings cache here..



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.

Review Comment:
   after the task reprocesses i.e recovers and performs the same writes?



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consitently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, using an LRU 
eviction strategy with a configurable size limit. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.
+   6. During the gap between steps 4 and 5, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+4. **Task failover after acknowledgment but before commit completion**: A task 
fails over after the acknowledgment is received but before the instant commit 
completes:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. A task fails and recovers from `ckp_n`.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+   6. Step 3 eventually completes and the instant is committed.
+   7. During the gap between steps 4 and 6, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+
+For data table (DT) metadata, the pipeline will recommit the instant using the 
recovered table metadata. However, since the `BucketAssigner` operator is 
upstream of the `StreamWrite` operator, there is a time gap before these 
inflight instants can be recommitted. 
+We do not want to block `BucketAssigner` processing while waiting for inflight 
instants to be recommitted. 
+
+The proposed solution is to include these special inflight instants in index 
access queries—essentially, we need to support reading inflight instants from 
the MDT. 
+Only inflight instants whose corresponding Flink checkpoint has succeeded are 
included. Inflight instants without a successful checkpoint are excluded. See 
the Appendix for more details on job/task failover handling.
+
+#### Add Event Time Ordering Value for RLI Payload
+For cross-partition updates or deletes, we can not update the RLI directly 
based on the existing key-location mappings. Currently, the RLI payload only 
has the key to location mappings without actual ordering value.
+we need to merge the data records to see if the incoming record is a valid 
update or delete(for valid, it means greater ordering value), that is the 
behavior for Spark RLI write path, but it is too costly for streaming.
+
+For e.g, for two records `r1:{key: k1, orderingValue: 2, partition: par1}` and 
`r2:{key: k1, orderingValue: 1, partition: par2}`, `r2` comes behind `r1` in a 
different commit, 
+comparison of just key existence is not enough, we need to also compare the 
ordering value to see that `r2` is not a valid update and 
+not sending the retraction record(delete record) into partition `par1` for 
payload delete.
+
+The suggested solution is to store the ordering value into the RLI payload, so 
that we can compare the ordering value when there is a match of exiting key 
lookup, to make the decision whether the incoming record is a valid
+upserts or not.
+
+The query execution follows this order: first access the in-memory cache, then 
query the MDT index:
+
+![The RLI Access Pattern](./rli-access-pattern.png)
+
+### Shuffling Index Records
+
+In the `StreamWrite` operator, index records are derived from incoming data 
records and sent to the `IndexWrite` operator in a streaming 
+fashion. These index records are shuffled by `hash(record_key) % 
num_index_shards`, using the same hashing algorithm as the MDT's 
+index partitioner. This shuffling strategy is critical for avoiding a 
combinatorial explosion of files written to the MDT partition. 
+Without it, the number of files would be `N * M`, where `N` is the number of 
index partition buckets and `M` is the number of data table 
+buckets involved in the current write.
+
+To ensure that each data record and its corresponding index record always 
belong to the same commit/checkpoint, we leverage 
+Flink's barrier alignment mechanism. In Flink, checkpoint barriers flow 
together with records through the pipeline (see 
[how-does-state-snapshotting-work](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work)).
 
+When the `StreamWrite` operator receives a record, it emits both the data 
record and its corresponding index record within a single `#processElement` 
call. 
+This ensures that the two records are never separated by a checkpoint barrier.
+
+For example:
+
+```text
+// irN means an index record, drN means a data record
+e.g:  [r4 r3 r2 r1 ] => BucketAssignor => [ ir4 dr4 ir3 dr3 ir2 dr2 ir1 dr1]
+```
+
+The barrier propagation algorithm prevents the checkpoint barrier from being 
placed between an index record and its corresponding data record. 
+A placement like `[ ir4 dr4 ir3 dr3 ir2 <checkpoint barrier> dr2 ir1 dr1]` 
cannot occur.
+
+### The Index Write
+
+In the `IndexWrite` operator, index records are buffered and then written to 
the MDT when triggered by a Flink checkpoint. 
+The write status metadata is then sent to the `coordinator`. This metadata 
includes two parts:
+
+- **A**: The written data file paths
+- **B**: The written MDT file paths (specifically those under the index 
partitions)
+
+#### Committing MDT (including Index Partitions)
+
+When committing to the data table, the MDT is committed first with the index 
write metadata (the MDT index partition file handles). 
+The `RLI` and `SI` partition file handles are committed together in the 
`FILES` partition.
+
+During a Flink checkpoint, each index-writing and data-writing task flushes 
all its records to the index and data files respectively. 
+This ensures that the index and data files are always consistent. Both are 
committed together from the Coordinator as a single Hudi commit, 
+following the current commit protocol.
+
+To maintain exactly-once semantics during job recovery, the write status 
metadata must be stored in multiple locations: the `StreamWrite` operator, the 
`IndexWrite` operator, and the `coordinator`. 
+This follows the same pattern as the current approach for maintaining data 
table metadata.
+
+### The Compaction
+
+To minimize task slot consumption, the implementation reuses the existing data 
file compaction sub-pipeline for MDT compaction. 
+This asynchronous compaction is automatically enabled when indexing is active.
+
+![Index Compaction Flow](./index-compaction-flow.png)
+
+## Implementation Plan 
+
+<WIP: broken down into phases of implementation along with relevant GH issues.>

Review Comment:
   lets break down the implementation into phases . each phase should have a 
clear exit point in terms of functionality and readiness for use. link GH 
issues here please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to