vinothchandar commented on code in PR #17610:
URL: https://github.com/apache/hudi/pull/17610#discussion_r2765307707


##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,265 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput.
+
+**New index mappings cache:** Additionally, a separate memory cache is needed 
for index mappings created during the current checkpoint. 
+These mappings are not yet committed to the Hudi table and are therefore 
invisible to MDT queries. This cache must not be cleared until the 
+corresponding checkpoint/instant is committed to Hudi, which indicates that 
the index payloads have also been committed. This ensures multiple
+records for the same record key (e,g insert to a key, followed by an update 
within the same commit boundary) are routed consistently to same 
+file group, preserving the 1:1 mapping from record key to file group.
+
+The cache stores `key -> location` mappings at the record level, the items are 
evicted by checkpoint level when the checkpoints are committed to Hudi. 
+(Note that the MDT reader also maintains its own native file-level cache.) 
+
+The actual index writes occur in the `IndexWrite` operator and the location 
from the cache will be propagated downstream from the `BucketAssigner` 
operator, where cache lookups and MDT queries to determine record locations.
+The cache is updated for new records and location changes, while the MDT is 
queried only for existing key locations.
+
+The cache update flow is as follows:
+
+1. Probe the cache for the key. If found, update the cache entry if the 
location has changed.
+2. If the key is not in the cache, fall back to querying the MDT. If the key 
exists in the MDT, add it to the cache with its location.
+3. If the key does not exist in the MDT either, add the new key and its 
assigned location to the cache.
+
+#### Job restarts and lost checkpoint acknowledgements
+
+Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds 
to a Hudi completed instant. During a checkpoint, Flink completes the data 
writes and collects the Hudi commit metadata. 
+Once the checkpoint acknowledgment event is received, Flink knows the 
checkpoint completed successfully, and the corresponding Hudi instant can be 
committed. 
+However, the acknowledgment message is sent asynchronously on a best-effort 
basis and may be lost in corner cases. A Hudi instant cannot be committed 
without receiving this acknowledgment.
+
+During job restarts or task failovers, there are scenarios where a Flink 
checkpoint succeeds but the corresponding Hudi instant remains uncommitted due 
to the two-phase commit mechanism:
+
+1. **Missing acknowledgment**: The acknowledgment message is lost entirely.
+2. **Job restart during commit**: The acknowledgment is received, but the job 
restarts during the instant commit process:
+   1. All write metadata from a checkpoint is collected in the coordinator and 
is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. The job restarts or crashes.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+3. **Task failover before acknowledgment**: A task fails over from a 
checkpoint before its acknowledgment is received:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The checkpoint succeeds, but the acknowledgment has not been received 
yet.
+   3. A task fails and recovers from `ckp_n`.
+   4. The instant remains uncommitted even though the checkpoint succeeded.
+   5. The acknowledgment message is eventually received.
+   6. During the gap between steps 4 and 5, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+4. **Task failover after acknowledgment but before commit completion**: A task 
fails over after the acknowledgment is received but before the instant commit 
completes:
+   1. All write metadata from checkpoint `ckp_n` is collected in the 
coordinator and is ready for committing.
+   2. The acknowledgment message is received.
+   3. The coordinator begins committing the instant with the collected write 
metadata.
+   4. A task fails and recovers from `ckp_n`.
+   5. The instant remains uncommitted even though the checkpoint succeeded.
+   6. Step 3 eventually completes and the instant is committed.
+   7. During the gap between steps 4 and 6, the `BucketAssigner` must access 
the uncommitted instant because the checkpoint was successful and the data is 
valid.
+
+For data table (DT) metadata, the pipeline will recommit the instant using the 
recovered table metadata. However, since the `BucketAssigner` operator is 
upstream of the `StreamWrite` operator, there is a time gap before these 
inflight instants can be recommitted. 
+We do not want to block `BucketAssigner` processing while waiting for inflight 
instants to be recommitted. Because there is risk of dead-lock for the pipeline:
+
+1. the bucket assign op blocks for the instant to recommit;
+2. the instant ack msg got lost for some reason(so the instant could never be 
recommitted);
+3. the bucket assign op wil block forever and the current checkpoint never 
finish;
+4. the job finally hang there and do nothing.
+
+The proposed solution is to include these special inflight instants in index 
access queries—essentially, we need to support reading inflight instants from 
the MDT. 

Review Comment:
   please expand this proposed solution and make a table that covers every 
scenario listed above. @danny0405 @cshuo 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to