Hi Piotr,

Thanks for your thoughtful feedback!

In fact, we had a lot of internal debates on this topic :). The proposal
you mentioned is one of several possible alternatives we've considered. It
works well for simple use cases of ValueState or ListState, where users
typically read state by partitioned key and do some update for each
incoming record. However, it may not be suitable for scenarios where:

   - A state is read by condition.
   - MapState with the user key cannot be determined in advance.

The cases above are common in implementation of SQL operators. Given that
remote I/O is tens of times slower than the local I/O[1], a miss of that
pre-fetched cache will greatly affect the TPS and bring unnecessary
overhead. Thus we finally choose to focus on our current plan with the
actual state request as the entry of the asynchronous execution instead of
before the `processElement`. And moreover we could further optimize the
performance by grouping state access[2] and executing them *in parallel*.
This approach not only supports all state access patterns but also ensures
that the performance of disaggregated state remains competitive when
compared to an exclusively local state setup[3].

As for the potential downside you mentioned, according to our PoC tests[3],
it is still beneficial to load state from local disk asynchronously (See
line 4 of that table with 100% state in local cache). Optimization mainly
comes from parallel I/O. We believe that in most cases using file-based
state backend, the current plan can ensure performance improvement across
all state access patterns.

However, IIUC, your proposal is valuable in that it is compatible with the
original state APIs, and it can co-exist with the current plan. We do
consider providing such a pre-fetch cache under the original state APIs and
enhancing the performance transparently in future milestones.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-HurdlesfortheDisaggregatedStateModel
[2] https://cwiki.apache.org/confluence/x/TYp3EQ
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-PoCTest


Thanks again & Best,
Zakelly

On Thu, Feb 29, 2024 at 10:22 PM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi!
>
> Thanks for this proposal. It looks like it will be a great improvement!
> I've only started reading the FLIP's, but I already have some questions
> about the FLIP-425, the async execution.
>
> What's the motivation behind splitting execution of a single element into
> multiple independent steps in individual futures? Have you considered
> something like this (?) :
>
> 1. Check what is the key of the incoming record.
> 2. Is the state for that key missing?
>     a) No! - just execute `processElement`/firing timer
>     b) Yes :( Are we already fetching a state for that key?
>         i) No - asynchronously fetch from the DFS to cache (local disk od
> mem) state for that key
>         ii) Yes - enqueue this element/timer after the element that has
> started async fetch and is already waiting for the fetch to complete
> 3. Once the async fetch completes for a given key, run `processElement` or
> `onEventTime` for all of the buffered elements for that key.
>
> That should both eliminate overheads, simplify the API for the users and
> potentially further improve performance/reduce latencies from processing
> all elements for the already pre-fetched key.
>
> If we consider the state already available (so no need to asynchronously
> fetch it) if it's either on local disks or memory, then I don't see a
> downside of this compared to your current proposal. If you would like to
> asynchronously fetch state from local disks into memory cache, then the
> naive version of this approach would have a downside of potentially
> unnecessarily reading into RAM a state field that the current
> `processElement` call wouldn't need. But that could also be
> solved/mitigated in a couple of ways. And I think considering the state as
> "available" for sync access if it's in local disks (not RAM) is probably
> good enough (comparable to RocksDB).
>
> Best,
> Piotrek
>
> czw., 29 lut 2024 o 07:17 Yuan Mei <yuanmei.w...@gmail.com> napisał(a):
>
> > Hi Devs,
> >
> > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu,
> > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> introducing
> > Disaggregated State Storage and Management in Flink 2.0.
> >
> > The past decade has witnessed a dramatic shift in Flink's deployment
> mode,
> > workload patterns, and hardware improvements. We've moved from the
> > map-reduce era where workers are computation-storage tightly coupled
> nodes
> > to a cloud-native world where containerized deployments on Kubernetes
> > become standard. To enable Flink's Cloud-Native future, we introduce
> > Disaggregated State Storage and Management that uses DFS as primary
> storage
> > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> >
> > Design Details can be found in FLIP-423[1].
> >
> > This new architecture is aimed to solve the following challenges brought
> in
> > the cloud-native era for Flink.
> > 1. Local Disk Constraints in containerization
> > 2. Spiky Resource Usage caused by compaction in the current state model
> > 3. Fast Rescaling for jobs with large states (hundreds of Terabytes)
> > 4. Light and Fast Checkpoint in a native way
> >
> > More specifically, we want to reach a consensus on the following issues
> in
> > this discussion:
> >
> > 1. Overall design
> > 2. Proposed Changes
> > 3. Design details to achieve Milestone1
> >
> > In M1, we aim to achieve an end-to-end baseline version using DFS as
> > primary storage and complete core functionalities, including:
> >
> > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> > asynchronous state access.
> > - Asynchronous Execution Model (FLIP-425)[3]: Implement a non-blocking
> > execution model leveraging the asynchronous APIs introduced in FLIP-424.
> > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of remote
> > state data in batches to avoid unnecessary round-trip costs for remote
> > access
> > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial version
> of
> > the ForSt disaggregated state store.
> > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> > checkpointing mechanisms with the disaggregated state store for fault
> > tolerance and fast rescaling.
> >
> > We will vote on each FLIP in separate threads to make sure each FLIP
> > reaches a consensus. But we want to keep the discussion within a focused
> > thread (this thread) for easier tracking of contexts to avoid duplicated
> > questions/discussions and also to think of the problem/solution in a full
> > picture.
> >
> > Looking forward to your feedback
> >
> > Best,
> > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> >
> > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> >
>

Reply via email to