danny0405 commented on code in PR #17610: URL: https://github.com/apache/hudi/pull/17610#discussion_r2692905519
########## rfc/rfc-102/rfc-102.md: ########## @@ -0,0 +1,151 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-102: RLI and SI support for Flink sink + +## Proposers + +- @danny0405 + +## Approvers + - @geserdugarov + - @vinothchandar + - @cshuo + +## Status + +GH Discussion: https://github.com/apache/hudi/discussions/17452 + +> Please keep the status updated in `rfc/README.md`. + +## Abstract +This RFC aims to introduce RLI and SI support for Flink streaming: + +- Impl reliable and performant write and read support for RLI via Flink APIs; +- The RLI impl is engines compatible, for e.g, Flink can access and utilize the RLI written by Spark and vice versa; +- The RLI is global, upserts among partitions is supported; Also support partition level RLI for large fact tables; +- Async compaction for MDT when RLI is enabled; in writer pipeline or table services background job; +- Smart caching of RLI; +- Clearly document scale/performance limits for write throughput supported by RLI (based on certain average response time for the RLI access, like from x0ms to x00ms) via empirical benchmarks; +- Ability to be expanded to arbitrary secondary indexing on different columns. + +## Background +Flink does not support RLI while spark does, this caused inconsistency between engines, for tables migrated from Spark to flink streaming, the index type needs to be switched to either bucket or flink_state, which caused an overhead for users in production. + +Another motivation is for scalable, efficient support for cross-partition updates (where the partition path of the record is changed). Currently, the only choice is flink_state index, which can be costly when used in such a scenario to hold state proportional to the size of table. +This is due to the fact that the flink_state could use a lot of memory and can not be shared between different workloads. + +## High Level Design + +The high-level ideas: + +- an RLI based index backend will be added, which can be used in place of the current the flink_state index; +- a cache of RLI would be introduced to speed the access; along with a caching invalidation mechanism to keep it consistent with committed state of the table. +- a separate Flink index function to write the RLI/SI payloads; +- the MDT RLI and SI files will be written synchronously with the data table data files, the metadata is sent to the coordinator for a final commit to the MDT(after `FILES` partition is ready); +- the MDT compaction is switched to be async and the data files compaction pipeline is reused for less take up of task slots. + + + +### Detailed Design + +### The RLI Access +In `BucketAssigner` operator, the RLI index metadata would be utilized as the index backend, the `BucketAssigner` operator will probe the RLI with the incoming record keys to figure out whether msg is update or insert or delete. +In other words, the RLI index metadata will serve as the same role of the `flink_state` index. Since current `BucketAssigner` already supports **global** and **non-global** index types, global RLI index would be applied for **global** +index and partitioned RLI would be supported for **non-global** index. + +In order to read per RLI shard once per commit, and not cache all RLI shards for each `BucketAssigner` task, the input records of `BucketAssigner` will be shuffled by `hash(record_key) % num_rli_shards`(the same hashing algorithm of the MDT `RLI index` partitioner). + +#### The Cache of RLI Access +We need fast access in streaming to have high throughput(milliseconds-level per record access), a general hotspot cache is needed. +We will build an in-memory LRU cache by the active upsert records keys, the cache items will be force evicted by a configured memory threshold. + +We also need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible. +This cache is not allowed to be cleaned until the checkpoint/instant is committed to Hudi(indicates that the index payloads are also committed). + +We are caching `key -> location` mappings (record level cache) with LRU style eviction strategy with configurable cache size, the MDT reader also got its native file level cache though. + +#### Access of Valid but Uncommitted Instants +On job restart or task failover, there is use case that the checkpoint succeeds on Flink while the instant is not committed to Hudi, for DT metadata, the pipeline will recommit +the instant with the recovered table metadata, because the `BucketAssigner` operator is the upstream operator of `StreamWrite` operator, there is time gap for these inflight instants to recommit, +and we do not want to block the processing of `BucketAssigner`(to wait for the inflight instants to recommit successfully). The suggested solution is +to include these special inflight instants on RLI access queries, basically, we need to support reading inflight instants on MDT. Review Comment: I add more details about the two-phase commit design of the checkpoint-instant, and such special instants as >These inflight instants are the ones whose corresponding checkpoint has succeeded, inflight instants without successful checkpoint are not included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
