Hey all!

This is a massive improvement / work. I just started going through the
Flips and have a more or less meta comment.

While it's good to keep the overall architecture discussion here, I think
we should still have separate discussions for each FLIP where we can
discuss interface details etc. With so much content if we start adding
minor comments here that will lead to nowhere but those discussions are
still important and we should have them in separate threads (one for each
FLIP)

What do you think?
Gyula

On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com> wrote:

> Hi team,
>
> Thanks for your discussion. Regarding FLIP-425, we have supplemented
> several updates to answer high-frequency questions:
>
> 1. We captured a flame graph of the Hashmap state backend in
> "Synchronous execution with asynchronous APIs"[1], which reveals that
> the framework overhead (including reference counting, future-related
> code and so on) consumes about 9% of the keyed operator CPU time.
> 2. We added a set of comparative experiments for watermark processing,
> the performance of Out-Of-Order mode is 70% better than
> strictly-ordered mode under ~140MB state size. Instructions on how to
> run this test have also been added[2].
> 3. Regarding the order of StreamRecord, whether it has state access or
> not. We supplemented a new *Strict order of 'processElement'*[3].
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
>
>
> Best regards,
> Yanfei
>
> Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道:
> >
> > Hi Zakelly,
> >
> > > 5. I'm not very sure ... revisiting this later since it is not
> important.
> >
> > It seems that we still have some details to confirm about this
> > question. Let's postpone this to after the critical parts of the
> > design are settled.
> >
> > > 8. Yes, we had considered ... metrics should be like afterwards.
> >
> > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in
> milestone 2.
> >
> > Looking forward to the detailed design about the strict mode between
> > same-key records and the benchmark results about the epoch mechanism.
> >
> > Best regards,
> > Yunfeng
> >
> > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >
> > > Hi Yunfeng,
> > >
> > > For 1:
> > > I had a discussion with Lincoln Lee, and I realize it is a common case
> the same-key record should be blocked before the `processElement`. It is
> easier for users to understand. Thus I will introduce a strict mode for
> this and make it default. My rough idea is just like yours, by invoking
> some method of AEC instance before `processElement`. The detailed design
> will be described in FLIP later.
> > >
> > > For 2:
> > > I agree with you. We could throw exceptions for now and optimize this
> later.
> > >
> > > For 5:
> > >>
> > >> It might be better to move the default values to the Proposed Changes
> > >> section instead of making them public for now, as there will be
> > >> compatibility issues once we want to dynamically adjust the thresholds
> > >> and timeouts in future.
> > >
> > > Agreed. The whole framework is under experiment until we think it is
> complete in 2.0 or later. The default value should be better determined
> with more testing results and production experience.
> > >
> > >> The configuration execution.async-state.enabled seems unnecessary, as
> > >> the infrastructure may automatically get this information from the
> > >> detailed state backend configurations. We may revisit this part after
> > >> the core designs have reached an agreement. WDYT?
> > >
> > >
> > > I'm not very sure if there is any use case where users write their
> code using async APIs but run their job in a synchronous way. The first two
> scenarios that come to me are for benchmarking or for a small state, while
> they don't want to rewrite their code. Actually it is easy to support, so
> I'd suggest providing it. But I'm fine with revisiting this later since it
> is not important. WDYT?
> > >
> > > For 8:
> > > Yes, we had considered the I/O metrics group especially the
> back-pressure, idle and task busy per second. In the current plan we can do
> state access during back-pressure, meaning that those metrics for input
> would better be redefined. I suggest we discuss these existing metrics as
> well as some new metrics that should be introduced in FLIP-431 later in
> milestone 2, since we have basically finished the framework thus we will
> have a better view of what metrics should be like afterwards. WDYT?
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com> wrote:
> > >>
> > >> Hi Zakelly,
> > >>
> > >> Thanks for the responses!
> > >>
> > >> > 1. I will discuss this with some expert SQL developers. ... mode
> for StreamRecord processing.
> > >>
> > >> In DataStream API there should also be use cases when the order of
> > >> output is strictly required. I agree with it that SQL experts may help
> > >> provide more concrete use cases that can accelerate our discussion,
> > >> but please allow me to search for DataStream use cases that can prove
> > >> the necessity of this strict order preservation mode, if answers from
> > >> SQL experts are shown to be negative.
> > >>
> > >> For your convenience, my current rough idea is that we can add a
> > >> module between the Input(s) and processElement() module in Fig 2 of
> > >> FLIP-425. The module will be responsible for caching records whose
> > >> keys collide with in-flight records, and AEC will only be responsible
> > >> for handling async state calls, without knowing the record each call
> > >> belongs to. We may revisit this topic once the necessity of the strict
> > >> order mode is clarified.
> > >>
> > >>
> > >> > 2. The amount of parallel StateRequests ... instead of invoking
> yield
> > >>
> > >> Your suggestions generally appeal to me. I think we may let
> > >> corresponding Flink jobs fail with OOM for now, since the majority of
> > >> a StateRequest should just be references to existing Java objects,
> > >> which only occupies very small memory space and can hardly cause OOM
> > >> in common cases. We can monitor the pending StateRequests and if there
> > >> is really a risk of OOM in extreme cases, we can throw Exceptions with
> > >> proper messages notifying users what to do, like increasing memory
> > >> through configurations.
> > >>
> > >> Your suggestions to adjust threshold adaptively or to use the blocking
> > >> buffer sounds good, and in my opinion we can postpone them to future
> > >> FLIPs since they seem to only benefit users in rare cases. Given that
> > >> FLIP-423~428 has already been a big enough design, it might be better
> > >> to focus on the most critical design for now and postpone
> > >> optimizations like this. WDYT?
> > >>
> > >>
> > >> > 5. Sure, we will introduce new configs as well as their default
> value.
> > >>
> > >> Thanks for adding the default values and the values themselves LGTM.
> > >> It might be better to move the default values to the Proposed Changes
> > >> section instead of making them public for now, as there will be
> > >> compatibility issues once we want to dynamically adjust the thresholds
> > >> and timeouts in future.
> > >>
> > >> The configuration execution.async-state.enabled seems unnecessary, as
> > >> the infrastructure may automatically get this information from the
> > >> detailed state backend configurations. We may revisit this part after
> > >> the core designs have reached an agreement. WDYT?
> > >>
> > >>
> > >> Besides, inspired by Jeyhun's comments, it comes to me that
> > >>
> > >> 8. Should this FLIP introduce metrics that measure the time a Flink
> > >> job is back-pressured by State IOs? Under the current design, this
> > >> metric could measure the time when the blocking buffer is full and
> > >> yield() cannot get callbacks to process, which means the operator is
> > >> fully waiting for state responses.
> > >>
> > >> Best regards,
> > >> Yunfeng
> > >>
> > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >> >
> > >> > Hi Yunfeng,
> > >> >
> > >> > Thanks for your detailed comments!
> > >> >
> > >> >> 1. Why do we need a close() method on StateIterator? This method
> seems
> > >> >> unused in the usage example codes.
> > >> >
> > >> >
> > >> > The `close()` is introduced to release internal resources, but it
> does not seem to require the user to call it. I removed this.
> > >> >
> > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No
> null
> > >> >> entries are allowed". It might be better to further explain what
> will
> > >> >> happen if a null value is passed, ignoring the value in the
> returned
> > >> >> Collection or throwing exceptions. Given that
> > >> >> FutureUtils.emptyFuture() can be returned in the example code, I
> > >> >> suppose the former one might be correct.
> > >> >
> > >> >
> > >> > The statement "No null entries are allowed" refers to the
> parameters, it means some arrayList like [null, StateFuture1, StateFuture2]
> passed in are not allowed, and an Exception will be thrown.
> > >> >
> > >> >> 1. According to Fig 2 of this FLIP, ... . This situation should be
> > >> >> avoided and the order of same-key records should be strictly
> > >> >> preserved.
> > >> >
> > >> >
> > >> > I will discuss this with some expert SQL developers. And if it is
> valid and common, I suggest a strict order preservation mode for
> StreamRecord processing. WDYT?
> > >> >
> > >> >> 2. The FLIP says that StateRequests submitted by Callbacks will not
> > >> >> invoke further yield() methods. Given that yield() is used when
> there
> > >> >> is "too much" in-flight data, does it mean StateRequests submitted
> by
> > >> >> Callbacks will never be "too much"? What if the total number of
> > >> >> StateRequests exceed the capacity of Flink operator's memory space?
> > >> >
> > >> >
> > >> > The amount of parallel StateRequests for one StreamRecord cannot be
> determined since the code is written by users. So the in-flight requests
> may be "too much", and may cause OOM. Users should re-configure their job,
> controlling the amount of on-going StreamRecord. And I suggest two ways to
> avoid this:
> > >> >
> > >> > Adaptively adjust the count of on-going StreamRecord according to
> historical StateRequests amount.
> > >> > Also control the max StateRequests that can be executed in parallel
> for each StreamRecord, and if it exceeds, put the new StateRequest in the
> blocking buffer waiting for execution (instead of invoking yield()).
> > >> >
> > >> > WDYT?
> > >> >
> > >> >
> > >> >> 3.1 I'm concerned that the out-of-order execution mode, along with
> the
> > >> >> epoch mechanism, would bring more complexity to the execution model
> > >> >> than the performance improvement it promises. Could we add some
> > >> >> benchmark results proving the benefit of this mode?
> > >> >
> > >> >
> > >> > Agreed, will do.
> > >> >
> > >> >> 3.2 The FLIP might need to add a public API section describing how
> > >> >> users or developers can switch between these two execution modes.
> > >> >
> > >> >
> > >> > Good point. We will add a Public API section.
> > >> >
> > >> >> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> > >> >> there are also more other events that might appear in the stream of
> > >> >> data records. It might be better to generalize the execution mode
> > >> >> mechanism to handle all possible events.
> > >> >
> > >> >
> > >> > Yes, I missed this point. Thanks for the reminder.
> > >> >
> > >> >> 4. It might be better to treat callback-handling as a
> > >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> > >> >> repeatedly creating Mail objects.
> > >> >
> > >> >
> > >> >  I thought the intermediated wrapper for callback can not be
> omitted, since there will be some context switch before each execution. The
> MailboxDefaultAction in most cases is processInput right? While the
> callback should be executed with higher priority. I'd suggest not changing
> the basic logic of Mailbox and the default action since it is very critical
> for performance. But yes, we will try our best to avoid creating
> intermediated objects.
> > >> >
> > >> >> 5. Could this FLIP provide the current default values for things
> like
> > >> >> active buffer size thresholds and timeouts? These could help with
> > >> >> memory consumption and latency analysis.
> > >> >
> > >> >
> > >> > Sure, we will introduce new configs as well as their default value.
> > >> >
> > >> >> 6. Why do we need to record the hashcode of a record in its
> > >> >> RecordContext? It seems not used.
> > >> >
> > >> >
> > >> > The context switch before each callback execution involves
> setCurrentKey, where the hashCode is re-calculated. We cache it for
> accelerating.
> > >> >
> > >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link
> > >> >> points to a document in flink-1.15. It might be better to verify
> the
> > >> >> referenced content is still valid in the latest Flink and update
> the
> > >> >> link accordingly. Same for other references if any.
> > >> >
> > >> >
> > >> > Thanks for the reminder! Will check.
> > >> >
> > >> >
> > >> > Thanks a lot & Best,
> > >> > Zakelly
> > >> >
> > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> > >> >>
> > >> >> Hi,
> > >> >>
> > >> >> Thanks for the great proposals. I have a few comments comments:
> > >> >>
> > >> >> - Backpressure Handling. Flink's original backpressure handling is
> quite
> > >> >> robust and the semantics is quite "simple" (simple is beautiful).
> > >> >> This mechanism has proven to perform better/robust than the other
> open
> > >> >> source streaming systems, where they were relying on some loopback
> > >> >> information.
> > >> >> Now that the proposal also relies on loopback (yield in this
> case), it is
> > >> >> not clear how well the new backpressure handling proposed in
> FLIP-425 is
> > >> >> robust and handle fluctuating workloads.
> > >> >>
> > >> >> - Watermark/Timer Handling: Similar arguments apply for watermark
> and timer
> > >> >> handling. IMHO, we need more benchmarks showing the overhead
> > >> >> of epoch management with different parameters (e.g., window size,
> watermark
> > >> >> strategy, etc)
> > >> >>
> > >> >> - DFS consistency guarantees. The proposal in FLIP-427 is
> DFS-agnostic.
> > >> >> However, different cloud providers have different storage
> consistency
> > >> >> models.
> > >> >> How do we want to deal with them?
> > >> >>
> > >> >>  Regards,
> > >> >> Jeyhun
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >> >>
> > >> >> > Thanks Piotr for sharing your thoughts!
> > >> >> >
> > >> >> > I guess it depends how we would like to treat the local disks.
> I've always
> > >> >> > > thought about them that almost always eventually all state
> from the DFS
> > >> >> > > should end up cached in the local disks.
> > >> >> >
> > >> >> >
> > >> >> > OK I got it. In our proposal we treat local disk as an optional
> cache, so
> > >> >> > the basic design will handle the case with state residing in DFS
> only. It
> > >> >> > is a more 'cloud-native' approach that does not rely on any
> local storage
> > >> >> > assumptions, which allow users to dynamically adjust the
> capacity or I/O
> > >> >> > bound of remote storage to gain performance or save the cost,
> even without
> > >> >> > a job restart.
> > >> >> >
> > >> >> > In
> > >> >> > > the currently proposed more fine grained solution, you make a
> single
> > >> >> > > request to DFS per each state access.
> > >> >> > >
> > >> >> >
> > >> >> > Ah that's not accurate. Actually we buffer the state requests
> and process
> > >> >> > them in batch, multiple requests will correspond to one DFS
> access (One
> > >> >> > block access for multiple keys performed by RocksDB).
> > >> >> >
> > >> >> > In that benchmark you mentioned, are you requesting the state
> > >> >> > > asynchronously from local disks into memory? If the benefit
> comes from
> > >> >> > > parallel I/O, then I would expect the benefit to
> disappear/shrink when
> > >> >> > > running multiple subtasks on the same machine, as they would
> be making
> > >> >> > > their own parallel requests, right? Also enabling
> checkpointing would
> > >> >> > > further cut into the available I/O budget.
> > >> >> >
> > >> >> >
> > >> >> > That's an interesting topic. Our proposal is specifically aimed
> at the
> > >> >> > scenario where the machine I/O is not fully loaded but the I/O
> latency has
> > >> >> > indeed become a bottleneck for each subtask. While the
> distributed file
> > >> >> > system is a prime example of a scenario characterized by
> abundant and
> > >> >> > easily scalable I/O bandwidth coupled with higher I/O latency.
> You may
> > >> >> > expect to increase the parallelism of a job to enhance the
> performance as
> > >> >> > well, but that also brings in more waste of CPU's and memory for
> building
> > >> >> > up more subtasks. This is one drawback for the
> computation-storage tightly
> > >> >> > coupled nodes. While in our proposal, the parallel I/O with all
> the
> > >> >> > callbacks still running in one task, pre-allocated computational
> resources
> > >> >> > are better utilized. It is a much more lightweight way to
> perform parallel
> > >> >> > I/O.
> > >> >> >
> > >> >> > Just with what granularity those async requests should be made.
> > >> >> > > Making state access asynchronous is definitely the right way
> to go!
> > >> >> >
> > >> >> >
> > >> >> > I think the current proposal is based on such core ideas:
> > >> >> >
> > >> >> >    - A pure cloud-native disaggregated state.
> > >> >> >    - Fully utilize the given resources and try not to waste them
> (including
> > >> >> >    I/O).
> > >> >> >    - The ability to scale isolated resources (I/O or CPU or
> memory)
> > >> >> >    independently.
> > >> >> >
> > >> >> > We think a fine-grained granularity is more inline with those
> ideas,
> > >> >> > especially without local disk assumptions and without any waste
> of I/O by
> > >> >> > prefetching. Please note that it is not a replacement of the
> original local
> > >> >> > state with synchronous execution. Instead this is a solution
> embracing the
> > >> >> > cloud-native era, providing much more scalability and resource
> efficiency
> > >> >> > when handling a *huge state*.
> > >> >> >
> > >> >> > What also worries me a lot in this fine grained model is the
> effect on the
> > >> >> > > checkpointing times.
> > >> >> >
> > >> >> >
> > >> >> > Your concerns are very reasonable. Faster checkpointing is
> always a core
> > >> >> > advantage of disaggregated state, but only for the async phase.
> There will
> > >> >> > be some complexity introduced by in-flight requests, but I'd
> suggest a
> > >> >> > checkpoint containing those in-flight state requests as part of
> the state,
> > >> >> > to accelerate the sync phase by skipping the buffer draining.
> This makes
> > >> >> > the buffer size have little impact on checkpoint time. And all
> the changes
> > >> >> > keep within the execution model we proposed while the checkpoint
> barrier
> > >> >> > alignment or handling will not be touched in our proposal, so I
> guess
> > >> >> > the complexity is relatively controllable. I have faith in that
> :)
> > >> >> >
> > >> >> > Also regarding the overheads, it would be great if you could
> provide
> > >> >> > > profiling results of the benchmarks that you conducted to
> verify the
> > >> >> > > results. Or maybe if you could describe the steps to reproduce
> the
> > >> >> > results?
> > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
> > >> >> > >
> > >> >> >
> > >> >> > Yes we could profile the benchmarks. And for the comparison of
> "Hashmap
> > >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job
> written
> > >> >> > with async APIs but disabling the async execution by directly
> completing
> > >> >> > the future using sync state access. This evaluates the overhead
> of newly
> > >> >> > introduced modules like 'AEC' in sync execution (even though
> they are not
> > >> >> > designed for it). The code will be provided later. For other
> results of our
> > >> >> > PoC[1], you can follow the instructions here[2] to reproduce.
> Since the
> > >> >> > compilation may take some effort, we will directly provide the
> jar for
> > >> >> > testing next week.
> > >> >> >
> > >> >> >
> > >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit late
> in my
> > >> >> > local time and the next few days are weekends. So I will reply
> to you
> > >> >> > later. Thanks for your response!
> > >> >> >
> > >> >> >
> > >> >> > [1]
> > >> >> >
> > >> >> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > >> >> > [2]
> > >> >> >
> > >> >> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > >> >> >
> > >> >> >
> > >> >> > Best,
> > >> >> > Zakelly
> > >> >> >
> > >> >> >
> > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>
> > >> >> > wrote:
> > >> >> >
> > >> >> > > Hi,
> > >> >> > >
> > >> >> > > Thanks for proposing this design! I just read FLIP-424 and
> FLIP-425
> > >> >> > > and have some questions about the proposed changes.
> > >> >> > >
> > >> >> > > For Async API (FLIP-424)
> > >> >> > >
> > >> >> > > 1. Why do we need a close() method on StateIterator? This
> method seems
> > >> >> > > unused in the usage example codes.
> > >> >> > >
> > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that
> "No null
> > >> >> > > entries are allowed". It might be better to further explain
> what will
> > >> >> > > happen if a null value is passed, ignoring the value in the
> returned
> > >> >> > > Collection or throwing exceptions. Given that
> > >> >> > > FutureUtils.emptyFuture() can be returned in the example code,
> I
> > >> >> > > suppose the former one might be correct.
> > >> >> > >
> > >> >> > >
> > >> >> > > For Async Execution (FLIP-425)
> > >> >> > >
> > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key
> collide
> > >> >> > > with an ongoing recordA, its processElement() method can still
> be
> > >> >> > > triggered immediately, and then it might be moved to the
> blocking
> > >> >> > > buffer in AEC if it involves state operations. This means that
> > >> >> > > recordB's output will precede recordA's output in downstream
> > >> >> > > operators, if recordA involves state operations while recordB
> does
> > >> >> > > not. This will harm the correctness of Flink jobs in some use
> cases.
> > >> >> > > For example, in dim table join cases, recordA could be a delete
> > >> >> > > operation that involves state access, while recordB could be
> an insert
> > >> >> > > operation that needs to visit external storage without state
> access.
> > >> >> > > If recordB's output precedes recordA's, then an entry that is
> supposed
> > >> >> > > to finally exist with recordB's value in the sink table might
> actually
> > >> >> > > be deleted according to recordA's command. This situation
> should be
> > >> >> > > avoided and the order of same-key records should be strictly
> > >> >> > > preserved.
> > >> >> > >
> > >> >> > > 2. The FLIP says that StateRequests submitted by Callbacks
> will not
> > >> >> > > invoke further yield() methods. Given that yield() is used
> when there
> > >> >> > > is "too much" in-flight data, does it mean StateRequests
> submitted by
> > >> >> > > Callbacks will never be "too much"? What if the total number of
> > >> >> > > StateRequests exceed the capacity of Flink operator's memory
> space?
> > >> >> > >
> > >> >> > > 3. In the "Watermark" section, this FLIP provided an
> out-of-order
> > >> >> > > execution mode apart from the default strictly-ordered mode,
> which can
> > >> >> > > optimize performance by allowing more concurrent executions.
> > >> >> > >
> > >> >> > > 3.1 I'm concerned that the out-of-order execution mode, along
> with the
> > >> >> > > epoch mechanism, would bring more complexity to the execution
> model
> > >> >> > > than the performance improvement it promises. Could we add some
> > >> >> > > benchmark results proving the benefit of this mode?
> > >> >> > >
> > >> >> > > 3.2 The FLIP might need to add a public API section describing
> how
> > >> >> > > users or developers can switch between these two execution
> modes.
> > >> >> > >
> > >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this
> FLIP,
> > >> >> > > there are also more other events that might appear in the
> stream of
> > >> >> > > data records. It might be better to generalize the execution
> mode
> > >> >> > > mechanism to handle all possible events.
> > >> >> > >
> > >> >> > > 4. It might be better to treat callback-handling as a
> > >> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead
> of
> > >> >> > > repeatedly creating Mail objects.
> > >> >> > >
> > >> >> > > 5. Could this FLIP provide the current default values for
> things like
> > >> >> > > active buffer size thresholds and timeouts? These could help
> with
> > >> >> > > memory consumption and latency analysis.
> > >> >> > >
> > >> >> > > 6. Why do we need to record the hashcode of a record in its
> > >> >> > > RecordContext? It seems not used.
> > >> >> > >
> > >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the
> link
> > >> >> > > points to a document in flink-1.15. It might be better to
> verify the
> > >> >> > > referenced content is still valid in the latest Flink and
> update the
> > >> >> > > link accordingly. Same for other references if any.
> > >> >> > >
> > >> >> > > Best,
> > >> >> > > Yunfeng Zhou
> > >> >> > >
> > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> yuanmei.w...@gmail.com> wrote:
> > >> >> > > >
> > >> >> > > > Hi Devs,
> > >> >> > > >
> > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li,
> Hangxiang
> > >> >> > Yu,
> > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion
> about
> > >> >> > > introducing
> > >> >> > > > Disaggregated State Storage and Management in Flink 2.0.
> > >> >> > > >
> > >> >> > > > The past decade has witnessed a dramatic shift in Flink's
> deployment
> > >> >> > > mode,
> > >> >> > > > workload patterns, and hardware improvements. We've moved
> from the
> > >> >> > > > map-reduce era where workers are computation-storage tightly
> coupled
> > >> >> > > nodes
> > >> >> > > > to a cloud-native world where containerized deployments on
> Kubernetes
> > >> >> > > > become standard. To enable Flink's Cloud-Native future, we
> introduce
> > >> >> > > > Disaggregated State Storage and Management that uses DFS as
> primary
> > >> >> > > storage
> > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> > >> >> > > >
> > >> >> > > > Design Details can be found in FLIP-423[1].
> > >> >> > > >
> > >> >> > > > This new architecture is aimed to solve the following
> challenges
> > >> >> > brought
> > >> >> > > in
> > >> >> > > > the cloud-native era for Flink.
> > >> >> > > > 1. Local Disk Constraints in containerization
> > >> >> > > > 2. Spiky Resource Usage caused by compaction in the current
> state model
> > >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of
> Terabytes)
> > >> >> > > > 4. Light and Fast Checkpoint in a native way
> > >> >> > > >
> > >> >> > > > More specifically, we want to reach a consensus on the
> following issues
> > >> >> > > in
> > >> >> > > > this discussion:
> > >> >> > > >
> > >> >> > > > 1. Overall design
> > >> >> > > > 2. Proposed Changes
> > >> >> > > > 3. Design details to achieve Milestone1
> > >> >> > > >
> > >> >> > > > In M1, we aim to achieve an end-to-end baseline version
> using DFS as
> > >> >> > > > primary storage and complete core functionalities, including:
> > >> >> > > >
> > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs
> for
> > >> >> > > > asynchronous state access.
> > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a
> non-blocking
> > >> >> > > > execution model leveraging the asynchronous APIs introduced
> in
> > >> >> > FLIP-424.
> > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable
> retrieval of
> > >> >> > remote
> > >> >> > > > state data in batches to avoid unnecessary round-trip costs
> for remote
> > >> >> > > > access
> > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the
> initial
> > >> >> > version
> > >> >> > > of
> > >> >> > > > the ForSt disaggregated state store.
> > >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]:
> Integrate
> > >> >> > > > checkpointing mechanisms with the disaggregated state store
> for fault
> > >> >> > > > tolerance and fast rescaling.
> > >> >> > > >
> > >> >> > > > We will vote on each FLIP in separate threads to make sure
> each FLIP
> > >> >> > > > reaches a consensus. But we want to keep the discussion
> within a
> > >> >> > focused
> > >> >> > > > thread (this thread) for easier tracking of contexts to avoid
> > >> >> > duplicated
> > >> >> > > > questions/discussions and also to think of the
> problem/solution in a
> > >> >> > full
> > >> >> > > > picture.
> > >> >> > > >
> > >> >> > > > Looking forward to your feedback
> > >> >> > > >
> > >> >> > > > Best,
> > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> > >> >> > > >
> > >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> > >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> > >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> > >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> > >> >> > >
> > >> >> >
>

Reply via email to