Hi everyone,

Thank you all for a lively discussion here, and it is a good time to move
forward to more detailed discussions. Thus we open several threads for
sub-FLIPs:

FLIP-424: https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
FLIP-425: https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
FLIP-426: https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
FLIP-427: https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
FLIP-428: https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b

If you want to talk about the overall architecture, roadmap, milestones or
something related with multiple FLIPs, please post it here. Otherwise you
can discuss some details in separate mails. Let's try to avoid repeated
discussion in different threads. I will sync important messages here if
there are any in the above threads.

And reply to @Jeyhun: We will ensure the content between those FLIPs is
consistent.


Best,
Zakelly

On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:

> I have been a bit busy these few weeks and sorry for responding late.
>
> The original thinking of keeping discussion within one thread is for easier
> tracking and avoid for repeated discussion in different threads.
>
> For details, It might be good to start in different threads if needed.
>
> We will think of a way to better organize the discussion.
>
> Best
> Yuan
>
>
> On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Hi,
> >
> > + 1 for the suggestion.
> > Maybe we can the discussion with the FLIPs with minimum dependencies
> (from
> > the other new/proposed FLIPs).
> > Based on our discussion on a particular FLIP, the subsequent (or its
> > dependent) FLIP(s) can be updated accordingly?
> >
> > Regards,
> > Jeyhun
> >
> > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
> >
> > > Hey all!
> > >
> > > This is a massive improvement / work. I just started going through the
> > > Flips and have a more or less meta comment.
> > >
> > > While it's good to keep the overall architecture discussion here, I
> think
> > > we should still have separate discussions for each FLIP where we can
> > > discuss interface details etc. With so much content if we start adding
> > > minor comments here that will lead to nowhere but those discussions are
> > > still important and we should have them in separate threads (one for
> each
> > > FLIP)
> > >
> > > What do you think?
> > > Gyula
> > >
> > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com> wrote:
> > >
> > > > Hi team,
> > > >
> > > > Thanks for your discussion. Regarding FLIP-425, we have supplemented
> > > > several updates to answer high-frequency questions:
> > > >
> > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > "Synchronous execution with asynchronous APIs"[1], which reveals that
> > > > the framework overhead (including reference counting, future-related
> > > > code and so on) consumes about 9% of the keyed operator CPU time.
> > > > 2. We added a set of comparative experiments for watermark
> processing,
> > > > the performance of Out-Of-Order mode is 70% better than
> > > > strictly-ordered mode under ~140MB state size. Instructions on how to
> > > > run this test have also been added[2].
> > > > 3. Regarding the order of StreamRecord, whether it has state access
> or
> > > > not. We supplemented a new *Strict order of 'processElement'*[3].
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > [2]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > [3]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > >
> > > >
> > > > Best regards,
> > > > Yanfei
> > > >
> > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道:
> > > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > > 5. I'm not very sure ... revisiting this later since it is not
> > > > important.
> > > > >
> > > > > It seems that we still have some details to confirm about this
> > > > > question. Let's postpone this to after the critical parts of the
> > > > > design are settled.
> > > > >
> > > > > > 8. Yes, we had considered ... metrics should be like afterwards.
> > > > >
> > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in
> > > > milestone 2.
> > > > >
> > > > > Looking forward to the detailed design about the strict mode
> between
> > > > > same-key records and the benchmark results about the epoch
> mechanism.
> > > > >
> > > > > Best regards,
> > > > > Yunfeng
> > > > >
> > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > Hi Yunfeng,
> > > > > >
> > > > > > For 1:
> > > > > > I had a discussion with Lincoln Lee, and I realize it is a common
> > > case
> > > > the same-key record should be blocked before the `processElement`. It
> > is
> > > > easier for users to understand. Thus I will introduce a strict mode
> for
> > > > this and make it default. My rough idea is just like yours, by
> invoking
> > > > some method of AEC instance before `processElement`. The detailed
> > design
> > > > will be described in FLIP later.
> > > > > >
> > > > > > For 2:
> > > > > > I agree with you. We could throw exceptions for now and optimize
> > this
> > > > later.
> > > > > >
> > > > > > For 5:
> > > > > >>
> > > > > >> It might be better to move the default values to the Proposed
> > > Changes
> > > > > >> section instead of making them public for now, as there will be
> > > > > >> compatibility issues once we want to dynamically adjust the
> > > thresholds
> > > > > >> and timeouts in future.
> > > > > >
> > > > > > Agreed. The whole framework is under experiment until we think it
> > is
> > > > complete in 2.0 or later. The default value should be better
> determined
> > > > with more testing results and production experience.
> > > > > >
> > > > > >> The configuration execution.async-state.enabled seems
> unnecessary,
> > > as
> > > > > >> the infrastructure may automatically get this information from
> the
> > > > > >> detailed state backend configurations. We may revisit this part
> > > after
> > > > > >> the core designs have reached an agreement. WDYT?
> > > > > >
> > > > > >
> > > > > > I'm not very sure if there is any use case where users write
> their
> > > > code using async APIs but run their job in a synchronous way. The
> first
> > > two
> > > > scenarios that come to me are for benchmarking or for a small state,
> > > while
> > > > they don't want to rewrite their code. Actually it is easy to
> support,
> > so
> > > > I'd suggest providing it. But I'm fine with revisiting this later
> since
> > > it
> > > > is not important. WDYT?
> > > > > >
> > > > > > For 8:
> > > > > > Yes, we had considered the I/O metrics group especially the
> > > > back-pressure, idle and task busy per second. In the current plan we
> > can
> > > do
> > > > state access during back-pressure, meaning that those metrics for
> input
> > > > would better be redefined. I suggest we discuss these existing
> metrics
> > as
> > > > well as some new metrics that should be introduced in FLIP-431 later
> in
> > > > milestone 2, since we have basically finished the framework thus we
> > will
> > > > have a better view of what metrics should be like afterwards. WDYT?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Zakelly
> > > > > >
> > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> > > > flink.zhouyunf...@gmail.com> wrote:
> > > > > >>
> > > > > >> Hi Zakelly,
> > > > > >>
> > > > > >> Thanks for the responses!
> > > > > >>
> > > > > >> > 1. I will discuss this with some expert SQL developers. ...
> mode
> > > > for StreamRecord processing.
> > > > > >>
> > > > > >> In DataStream API there should also be use cases when the order
> of
> > > > > >> output is strictly required. I agree with it that SQL experts
> may
> > > help
> > > > > >> provide more concrete use cases that can accelerate our
> > discussion,
> > > > > >> but please allow me to search for DataStream use cases that can
> > > prove
> > > > > >> the necessity of this strict order preservation mode, if answers
> > > from
> > > > > >> SQL experts are shown to be negative.
> > > > > >>
> > > > > >> For your convenience, my current rough idea is that we can add a
> > > > > >> module between the Input(s) and processElement() module in Fig 2
> > of
> > > > > >> FLIP-425. The module will be responsible for caching records
> whose
> > > > > >> keys collide with in-flight records, and AEC will only be
> > > responsible
> > > > > >> for handling async state calls, without knowing the record each
> > call
> > > > > >> belongs to. We may revisit this topic once the necessity of the
> > > strict
> > > > > >> order mode is clarified.
> > > > > >>
> > > > > >>
> > > > > >> > 2. The amount of parallel StateRequests ... instead of
> invoking
> > > > yield
> > > > > >>
> > > > > >> Your suggestions generally appeal to me. I think we may let
> > > > > >> corresponding Flink jobs fail with OOM for now, since the
> majority
> > > of
> > > > > >> a StateRequest should just be references to existing Java
> objects,
> > > > > >> which only occupies very small memory space and can hardly cause
> > OOM
> > > > > >> in common cases. We can monitor the pending StateRequests and if
> > > there
> > > > > >> is really a risk of OOM in extreme cases, we can throw
> Exceptions
> > > with
> > > > > >> proper messages notifying users what to do, like increasing
> memory
> > > > > >> through configurations.
> > > > > >>
> > > > > >> Your suggestions to adjust threshold adaptively or to use the
> > > blocking
> > > > > >> buffer sounds good, and in my opinion we can postpone them to
> > future
> > > > > >> FLIPs since they seem to only benefit users in rare cases. Given
> > > that
> > > > > >> FLIP-423~428 has already been a big enough design, it might be
> > > better
> > > > > >> to focus on the most critical design for now and postpone
> > > > > >> optimizations like this. WDYT?
> > > > > >>
> > > > > >>
> > > > > >> > 5. Sure, we will introduce new configs as well as their
> default
> > > > value.
> > > > > >>
> > > > > >> Thanks for adding the default values and the values themselves
> > LGTM.
> > > > > >> It might be better to move the default values to the Proposed
> > > Changes
> > > > > >> section instead of making them public for now, as there will be
> > > > > >> compatibility issues once we want to dynamically adjust the
> > > thresholds
> > > > > >> and timeouts in future.
> > > > > >>
> > > > > >> The configuration execution.async-state.enabled seems
> unnecessary,
> > > as
> > > > > >> the infrastructure may automatically get this information from
> the
> > > > > >> detailed state backend configurations. We may revisit this part
> > > after
> > > > > >> the core designs have reached an agreement. WDYT?
> > > > > >>
> > > > > >>
> > > > > >> Besides, inspired by Jeyhun's comments, it comes to me that
> > > > > >>
> > > > > >> 8. Should this FLIP introduce metrics that measure the time a
> > Flink
> > > > > >> job is back-pressured by State IOs? Under the current design,
> this
> > > > > >> metric could measure the time when the blocking buffer is full
> and
> > > > > >> yield() cannot get callbacks to process, which means the
> operator
> > is
> > > > > >> fully waiting for state responses.
> > > > > >>
> > > > > >> Best regards,
> > > > > >> Yunfeng
> > > > > >>
> > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
> > zakelly....@gmail.com>
> > > > wrote:
> > > > > >> >
> > > > > >> > Hi Yunfeng,
> > > > > >> >
> > > > > >> > Thanks for your detailed comments!
> > > > > >> >
> > > > > >> >> 1. Why do we need a close() method on StateIterator? This
> > method
> > > > seems
> > > > > >> >> unused in the usage example codes.
> > > > > >> >
> > > > > >> >
> > > > > >> > The `close()` is introduced to release internal resources, but
> > it
> > > > does not seem to require the user to call it. I removed this.
> > > > > >> >
> > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that
> "No
> > > > null
> > > > > >> >> entries are allowed". It might be better to further explain
> > what
> > > > will
> > > > > >> >> happen if a null value is passed, ignoring the value in the
> > > > returned
> > > > > >> >> Collection or throwing exceptions. Given that
> > > > > >> >> FutureUtils.emptyFuture() can be returned in the example
> code,
> > I
> > > > > >> >> suppose the former one might be correct.
> > > > > >> >
> > > > > >> >
> > > > > >> > The statement "No null entries are allowed" refers to the
> > > > parameters, it means some arrayList like [null, StateFuture1,
> > > StateFuture2]
> > > > passed in are not allowed, and an Exception will be thrown.
> > > > > >> >
> > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This situation
> should
> > > be
> > > > > >> >> avoided and the order of same-key records should be strictly
> > > > > >> >> preserved.
> > > > > >> >
> > > > > >> >
> > > > > >> > I will discuss this with some expert SQL developers. And if it
> > is
> > > > valid and common, I suggest a strict order preservation mode for
> > > > StreamRecord processing. WDYT?
> > > > > >> >
> > > > > >> >> 2. The FLIP says that StateRequests submitted by Callbacks
> will
> > > not
> > > > > >> >> invoke further yield() methods. Given that yield() is used
> when
> > > > there
> > > > > >> >> is "too much" in-flight data, does it mean StateRequests
> > > submitted
> > > > by
> > > > > >> >> Callbacks will never be "too much"? What if the total number
> of
> > > > > >> >> StateRequests exceed the capacity of Flink operator's memory
> > > space?
> > > > > >> >
> > > > > >> >
> > > > > >> > The amount of parallel StateRequests for one StreamRecord
> cannot
> > > be
> > > > determined since the code is written by users. So the in-flight
> > requests
> > > > may be "too much", and may cause OOM. Users should re-configure their
> > > job,
> > > > controlling the amount of on-going StreamRecord. And I suggest two
> ways
> > > to
> > > > avoid this:
> > > > > >> >
> > > > > >> > Adaptively adjust the count of on-going StreamRecord according
> > to
> > > > historical StateRequests amount.
> > > > > >> > Also control the max StateRequests that can be executed in
> > > parallel
> > > > for each StreamRecord, and if it exceeds, put the new StateRequest in
> > the
> > > > blocking buffer waiting for execution (instead of invoking yield()).
> > > > > >> >
> > > > > >> > WDYT?
> > > > > >> >
> > > > > >> >
> > > > > >> >> 3.1 I'm concerned that the out-of-order execution mode, along
> > > with
> > > > the
> > > > > >> >> epoch mechanism, would bring more complexity to the execution
> > > model
> > > > > >> >> than the performance improvement it promises. Could we add
> some
> > > > > >> >> benchmark results proving the benefit of this mode?
> > > > > >> >
> > > > > >> >
> > > > > >> > Agreed, will do.
> > > > > >> >
> > > > > >> >> 3.2 The FLIP might need to add a public API section
> describing
> > > how
> > > > > >> >> users or developers can switch between these two execution
> > modes.
> > > > > >> >
> > > > > >> >
> > > > > >> > Good point. We will add a Public API section.
> > > > > >> >
> > > > > >> >> 3.3 Apart from the watermark and checkpoint mentioned in this
> > > FLIP,
> > > > > >> >> there are also more other events that might appear in the
> > stream
> > > of
> > > > > >> >> data records. It might be better to generalize the execution
> > mode
> > > > > >> >> mechanism to handle all possible events.
> > > > > >> >
> > > > > >> >
> > > > > >> > Yes, I missed this point. Thanks for the reminder.
> > > > > >> >
> > > > > >> >> 4. It might be better to treat callback-handling as a
> > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead
> > of
> > > > > >> >> repeatedly creating Mail objects.
> > > > > >> >
> > > > > >> >
> > > > > >> >  I thought the intermediated wrapper for callback can not be
> > > > omitted, since there will be some context switch before each
> execution.
> > > The
> > > > MailboxDefaultAction in most cases is processInput right? While the
> > > > callback should be executed with higher priority. I'd suggest not
> > > changing
> > > > the basic logic of Mailbox and the default action since it is very
> > > critical
> > > > for performance. But yes, we will try our best to avoid creating
> > > > intermediated objects.
> > > > > >> >
> > > > > >> >> 5. Could this FLIP provide the current default values for
> > things
> > > > like
> > > > > >> >> active buffer size thresholds and timeouts? These could help
> > with
> > > > > >> >> memory consumption and latency analysis.
> > > > > >> >
> > > > > >> >
> > > > > >> > Sure, we will introduce new configs as well as their default
> > > value.
> > > > > >> >
> > > > > >> >> 6. Why do we need to record the hashcode of a record in its
> > > > > >> >> RecordContext? It seems not used.
> > > > > >> >
> > > > > >> >
> > > > > >> > The context switch before each callback execution involves
> > > > setCurrentKey, where the hashCode is re-calculated. We cache it for
> > > > accelerating.
> > > > > >> >
> > > > > >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the
> > link
> > > > > >> >> points to a document in flink-1.15. It might be better to
> > verify
> > > > the
> > > > > >> >> referenced content is still valid in the latest Flink and
> > update
> > > > the
> > > > > >> >> link accordingly. Same for other references if any.
> > > > > >> >
> > > > > >> >
> > > > > >> > Thanks for the reminder! Will check.
> > > > > >> >
> > > > > >> >
> > > > > >> > Thanks a lot & Best,
> > > > > >> > Zakelly
> > > > > >> >
> > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> > > je.kari...@gmail.com>
> > > > wrote:
> > > > > >> >>
> > > > > >> >> Hi,
> > > > > >> >>
> > > > > >> >> Thanks for the great proposals. I have a few comments
> comments:
> > > > > >> >>
> > > > > >> >> - Backpressure Handling. Flink's original backpressure
> handling
> > > is
> > > > quite
> > > > > >> >> robust and the semantics is quite "simple" (simple is
> > beautiful).
> > > > > >> >> This mechanism has proven to perform better/robust than the
> > other
> > > > open
> > > > > >> >> source streaming systems, where they were relying on some
> > > loopback
> > > > > >> >> information.
> > > > > >> >> Now that the proposal also relies on loopback (yield in this
> > > > case), it is
> > > > > >> >> not clear how well the new backpressure handling proposed in
> > > > FLIP-425 is
> > > > > >> >> robust and handle fluctuating workloads.
> > > > > >> >>
> > > > > >> >> - Watermark/Timer Handling: Similar arguments apply for
> > watermark
> > > > and timer
> > > > > >> >> handling. IMHO, we need more benchmarks showing the overhead
> > > > > >> >> of epoch management with different parameters (e.g., window
> > size,
> > > > watermark
> > > > > >> >> strategy, etc)
> > > > > >> >>
> > > > > >> >> - DFS consistency guarantees. The proposal in FLIP-427 is
> > > > DFS-agnostic.
> > > > > >> >> However, different cloud providers have different storage
> > > > consistency
> > > > > >> >> models.
> > > > > >> >> How do we want to deal with them?
> > > > > >> >>
> > > > > >> >>  Regards,
> > > > > >> >> Jeyhun
> > > > > >> >>
> > > > > >> >>
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> > > zakelly....@gmail.com>
> > > > wrote:
> > > > > >> >>
> > > > > >> >> > Thanks Piotr for sharing your thoughts!
> > > > > >> >> >
> > > > > >> >> > I guess it depends how we would like to treat the local
> > disks.
> > > > I've always
> > > > > >> >> > > thought about them that almost always eventually all
> state
> > > > from the DFS
> > > > > >> >> > > should end up cached in the local disks.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > OK I got it. In our proposal we treat local disk as an
> > optional
> > > > cache, so
> > > > > >> >> > the basic design will handle the case with state residing
> in
> > > DFS
> > > > only. It
> > > > > >> >> > is a more 'cloud-native' approach that does not rely on any
> > > > local storage
> > > > > >> >> > assumptions, which allow users to dynamically adjust the
> > > > capacity or I/O
> > > > > >> >> > bound of remote storage to gain performance or save the
> cost,
> > > > even without
> > > > > >> >> > a job restart.
> > > > > >> >> >
> > > > > >> >> > In
> > > > > >> >> > > the currently proposed more fine grained solution, you
> > make a
> > > > single
> > > > > >> >> > > request to DFS per each state access.
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > Ah that's not accurate. Actually we buffer the state
> requests
> > > > and process
> > > > > >> >> > them in batch, multiple requests will correspond to one DFS
> > > > access (One
> > > > > >> >> > block access for multiple keys performed by RocksDB).
> > > > > >> >> >
> > > > > >> >> > In that benchmark you mentioned, are you requesting the
> state
> > > > > >> >> > > asynchronously from local disks into memory? If the
> benefit
> > > > comes from
> > > > > >> >> > > parallel I/O, then I would expect the benefit to
> > > > disappear/shrink when
> > > > > >> >> > > running multiple subtasks on the same machine, as they
> > would
> > > > be making
> > > > > >> >> > > their own parallel requests, right? Also enabling
> > > > checkpointing would
> > > > > >> >> > > further cut into the available I/O budget.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > That's an interesting topic. Our proposal is specifically
> > aimed
> > > > at the
> > > > > >> >> > scenario where the machine I/O is not fully loaded but the
> > I/O
> > > > latency has
> > > > > >> >> > indeed become a bottleneck for each subtask. While the
> > > > distributed file
> > > > > >> >> > system is a prime example of a scenario characterized by
> > > > abundant and
> > > > > >> >> > easily scalable I/O bandwidth coupled with higher I/O
> > latency.
> > > > You may
> > > > > >> >> > expect to increase the parallelism of a job to enhance the
> > > > performance as
> > > > > >> >> > well, but that also brings in more waste of CPU's and
> memory
> > > for
> > > > building
> > > > > >> >> > up more subtasks. This is one drawback for the
> > > > computation-storage tightly
> > > > > >> >> > coupled nodes. While in our proposal, the parallel I/O with
> > all
> > > > the
> > > > > >> >> > callbacks still running in one task, pre-allocated
> > > computational
> > > > resources
> > > > > >> >> > are better utilized. It is a much more lightweight way to
> > > > perform parallel
> > > > > >> >> > I/O.
> > > > > >> >> >
> > > > > >> >> > Just with what granularity those async requests should be
> > made.
> > > > > >> >> > > Making state access asynchronous is definitely the right
> > way
> > > > to go!
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > I think the current proposal is based on such core ideas:
> > > > > >> >> >
> > > > > >> >> >    - A pure cloud-native disaggregated state.
> > > > > >> >> >    - Fully utilize the given resources and try not to waste
> > > them
> > > > (including
> > > > > >> >> >    I/O).
> > > > > >> >> >    - The ability to scale isolated resources (I/O or CPU or
> > > > memory)
> > > > > >> >> >    independently.
> > > > > >> >> >
> > > > > >> >> > We think a fine-grained granularity is more inline with
> those
> > > > ideas,
> > > > > >> >> > especially without local disk assumptions and without any
> > waste
> > > > of I/O by
> > > > > >> >> > prefetching. Please note that it is not a replacement of
> the
> > > > original local
> > > > > >> >> > state with synchronous execution. Instead this is a
> solution
> > > > embracing the
> > > > > >> >> > cloud-native era, providing much more scalability and
> > resource
> > > > efficiency
> > > > > >> >> > when handling a *huge state*.
> > > > > >> >> >
> > > > > >> >> > What also worries me a lot in this fine grained model is
> the
> > > > effect on the
> > > > > >> >> > > checkpointing times.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > Your concerns are very reasonable. Faster checkpointing is
> > > > always a core
> > > > > >> >> > advantage of disaggregated state, but only for the async
> > phase.
> > > > There will
> > > > > >> >> > be some complexity introduced by in-flight requests, but
> I'd
> > > > suggest a
> > > > > >> >> > checkpoint containing those in-flight state requests as
> part
> > of
> > > > the state,
> > > > > >> >> > to accelerate the sync phase by skipping the buffer
> draining.
> > > > This makes
> > > > > >> >> > the buffer size have little impact on checkpoint time. And
> > all
> > > > the changes
> > > > > >> >> > keep within the execution model we proposed while the
> > > checkpoint
> > > > barrier
> > > > > >> >> > alignment or handling will not be touched in our proposal,
> > so I
> > > > guess
> > > > > >> >> > the complexity is relatively controllable. I have faith in
> > that
> > > > :)
> > > > > >> >> >
> > > > > >> >> > Also regarding the overheads, it would be great if you
> could
> > > > provide
> > > > > >> >> > > profiling results of the benchmarks that you conducted to
> > > > verify the
> > > > > >> >> > > results. Or maybe if you could describe the steps to
> > > reproduce
> > > > the
> > > > > >> >> > results?
> > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >> > Yes we could profile the benchmarks. And for the comparison
> > of
> > > > "Hashmap
> > > > > >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount
> > job
> > > > written
> > > > > >> >> > with async APIs but disabling the async execution by
> directly
> > > > completing
> > > > > >> >> > the future using sync state access. This evaluates the
> > overhead
> > > > of newly
> > > > > >> >> > introduced modules like 'AEC' in sync execution (even
> though
> > > > they are not
> > > > > >> >> > designed for it). The code will be provided later. For
> other
> > > > results of our
> > > > > >> >> > PoC[1], you can follow the instructions here[2] to
> reproduce.
> > > > Since the
> > > > > >> >> > compilation may take some effort, we will directly provide
> > the
> > > > jar for
> > > > > >> >> > testing next week.
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit
> > > late
> > > > in my
> > > > > >> >> > local time and the next few days are weekends. So I will
> > reply
> > > > to you
> > > > > >> >> > later. Thanks for your response!
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > [1]
> > > > > >> >> >
> > > > > >> >> >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > > > > >> >> > [2]
> > > > > >> >> >
> > > > > >> >> >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > Best,
> > > > > >> >> > Zakelly
> > > > > >> >> >
> > > > > >> >> >
> > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> > > > flink.zhouyunf...@gmail.com>
> > > > > >> >> > wrote:
> > > > > >> >> >
> > > > > >> >> > > Hi,
> > > > > >> >> > >
> > > > > >> >> > > Thanks for proposing this design! I just read FLIP-424
> and
> > > > FLIP-425
> > > > > >> >> > > and have some questions about the proposed changes.
> > > > > >> >> > >
> > > > > >> >> > > For Async API (FLIP-424)
> > > > > >> >> > >
> > > > > >> >> > > 1. Why do we need a close() method on StateIterator? This
> > > > method seems
> > > > > >> >> > > unused in the usage example codes.
> > > > > >> >> > >
> > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated
> that
> > > > "No null
> > > > > >> >> > > entries are allowed". It might be better to further
> explain
> > > > what will
> > > > > >> >> > > happen if a null value is passed, ignoring the value in
> the
> > > > returned
> > > > > >> >> > > Collection or throwing exceptions. Given that
> > > > > >> >> > > FutureUtils.emptyFuture() can be returned in the example
> > > code,
> > > > I
> > > > > >> >> > > suppose the former one might be correct.
> > > > > >> >> > >
> > > > > >> >> > >
> > > > > >> >> > > For Async Execution (FLIP-425)
> > > > > >> >> > >
> > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its
> > key
> > > > collide
> > > > > >> >> > > with an ongoing recordA, its processElement() method can
> > > still
> > > > be
> > > > > >> >> > > triggered immediately, and then it might be moved to the
> > > > blocking
> > > > > >> >> > > buffer in AEC if it involves state operations. This means
> > > that
> > > > > >> >> > > recordB's output will precede recordA's output in
> > downstream
> > > > > >> >> > > operators, if recordA involves state operations while
> > recordB
> > > > does
> > > > > >> >> > > not. This will harm the correctness of Flink jobs in some
> > use
> > > > cases.
> > > > > >> >> > > For example, in dim table join cases, recordA could be a
> > > delete
> > > > > >> >> > > operation that involves state access, while recordB could
> > be
> > > > an insert
> > > > > >> >> > > operation that needs to visit external storage without
> > state
> > > > access.
> > > > > >> >> > > If recordB's output precedes recordA's, then an entry
> that
> > is
> > > > supposed
> > > > > >> >> > > to finally exist with recordB's value in the sink table
> > might
> > > > actually
> > > > > >> >> > > be deleted according to recordA's command. This situation
> > > > should be
> > > > > >> >> > > avoided and the order of same-key records should be
> > strictly
> > > > > >> >> > > preserved.
> > > > > >> >> > >
> > > > > >> >> > > 2. The FLIP says that StateRequests submitted by
> Callbacks
> > > > will not
> > > > > >> >> > > invoke further yield() methods. Given that yield() is
> used
> > > > when there
> > > > > >> >> > > is "too much" in-flight data, does it mean StateRequests
> > > > submitted by
> > > > > >> >> > > Callbacks will never be "too much"? What if the total
> > number
> > > of
> > > > > >> >> > > StateRequests exceed the capacity of Flink operator's
> > memory
> > > > space?
> > > > > >> >> > >
> > > > > >> >> > > 3. In the "Watermark" section, this FLIP provided an
> > > > out-of-order
> > > > > >> >> > > execution mode apart from the default strictly-ordered
> > mode,
> > > > which can
> > > > > >> >> > > optimize performance by allowing more concurrent
> > executions.
> > > > > >> >> > >
> > > > > >> >> > > 3.1 I'm concerned that the out-of-order execution mode,
> > along
> > > > with the
> > > > > >> >> > > epoch mechanism, would bring more complexity to the
> > execution
> > > > model
> > > > > >> >> > > than the performance improvement it promises. Could we
> add
> > > some
> > > > > >> >> > > benchmark results proving the benefit of this mode?
> > > > > >> >> > >
> > > > > >> >> > > 3.2 The FLIP might need to add a public API section
> > > describing
> > > > how
> > > > > >> >> > > users or developers can switch between these two
> execution
> > > > modes.
> > > > > >> >> > >
> > > > > >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in
> > this
> > > > FLIP,
> > > > > >> >> > > there are also more other events that might appear in the
> > > > stream of
> > > > > >> >> > > data records. It might be better to generalize the
> > execution
> > > > mode
> > > > > >> >> > > mechanism to handle all possible events.
> > > > > >> >> > >
> > > > > >> >> > > 4. It might be better to treat callback-handling as a
> > > > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid the
> > overhead
> > > > of
> > > > > >> >> > > repeatedly creating Mail objects.
> > > > > >> >> > >
> > > > > >> >> > > 5. Could this FLIP provide the current default values for
> > > > things like
> > > > > >> >> > > active buffer size thresholds and timeouts? These could
> > help
> > > > with
> > > > > >> >> > > memory consumption and latency analysis.
> > > > > >> >> > >
> > > > > >> >> > > 6. Why do we need to record the hashcode of a record in
> its
> > > > > >> >> > > RecordContext? It seems not used.
> > > > > >> >> > >
> > > > > >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB",
> > the
> > > > link
> > > > > >> >> > > points to a document in flink-1.15. It might be better to
> > > > verify the
> > > > > >> >> > > referenced content is still valid in the latest Flink and
> > > > update the
> > > > > >> >> > > link accordingly. Same for other references if any.
> > > > > >> >> > >
> > > > > >> >> > > Best,
> > > > > >> >> > > Yunfeng Zhou
> > > > > >> >> > >
> > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> > > > yuanmei.w...@gmail.com> wrote:
> > > > > >> >> > > >
> > > > > >> >> > > > Hi Devs,
> > > > > >> >> > > >
> > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong
> > Li,
> > > > Hangxiang
> > > > > >> >> > Yu,
> > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a
> discussion
> > > > about
> > > > > >> >> > > introducing
> > > > > >> >> > > > Disaggregated State Storage and Management in Flink
> 2.0.
> > > > > >> >> > > >
> > > > > >> >> > > > The past decade has witnessed a dramatic shift in
> Flink's
> > > > deployment
> > > > > >> >> > > mode,
> > > > > >> >> > > > workload patterns, and hardware improvements. We've
> moved
> > > > from the
> > > > > >> >> > > > map-reduce era where workers are computation-storage
> > > tightly
> > > > coupled
> > > > > >> >> > > nodes
> > > > > >> >> > > > to a cloud-native world where containerized deployments
> > on
> > > > Kubernetes
> > > > > >> >> > > > become standard. To enable Flink's Cloud-Native future,
> > we
> > > > introduce
> > > > > >> >> > > > Disaggregated State Storage and Management that uses
> DFS
> > as
> > > > primary
> > > > > >> >> > > storage
> > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> > > > > >> >> > > >
> > > > > >> >> > > > Design Details can be found in FLIP-423[1].
> > > > > >> >> > > >
> > > > > >> >> > > > This new architecture is aimed to solve the following
> > > > challenges
> > > > > >> >> > brought
> > > > > >> >> > > in
> > > > > >> >> > > > the cloud-native era for Flink.
> > > > > >> >> > > > 1. Local Disk Constraints in containerization
> > > > > >> >> > > > 2. Spiky Resource Usage caused by compaction in the
> > current
> > > > state model
> > > > > >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds
> of
> > > > Terabytes)
> > > > > >> >> > > > 4. Light and Fast Checkpoint in a native way
> > > > > >> >> > > >
> > > > > >> >> > > > More specifically, we want to reach a consensus on the
> > > > following issues
> > > > > >> >> > > in
> > > > > >> >> > > > this discussion:
> > > > > >> >> > > >
> > > > > >> >> > > > 1. Overall design
> > > > > >> >> > > > 2. Proposed Changes
> > > > > >> >> > > > 3. Design details to achieve Milestone1
> > > > > >> >> > > >
> > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline version
> > > > using DFS as
> > > > > >> >> > > > primary storage and complete core functionalities,
> > > including:
> > > > > >> >> > > >
> > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new
> > APIs
> > > > for
> > > > > >> >> > > > asynchronous state access.
> > > > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]:
> Implement a
> > > > non-blocking
> > > > > >> >> > > > execution model leveraging the asynchronous APIs
> > introduced
> > > > in
> > > > > >> >> > FLIP-424.
> > > > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable
> > > > retrieval of
> > > > > >> >> > remote
> > > > > >> >> > > > state data in batches to avoid unnecessary round-trip
> > costs
> > > > for remote
> > > > > >> >> > > > access
> > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce
> the
> > > > initial
> > > > > >> >> > version
> > > > > >> >> > > of
> > > > > >> >> > > > the ForSt disaggregated state store.
> > > > > >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]:
> > > > Integrate
> > > > > >> >> > > > checkpointing mechanisms with the disaggregated state
> > store
> > > > for fault
> > > > > >> >> > > > tolerance and fast rescaling.
> > > > > >> >> > > >
> > > > > >> >> > > > We will vote on each FLIP in separate threads to make
> > sure
> > > > each FLIP
> > > > > >> >> > > > reaches a consensus. But we want to keep the discussion
> > > > within a
> > > > > >> >> > focused
> > > > > >> >> > > > thread (this thread) for easier tracking of contexts to
> > > avoid
> > > > > >> >> > duplicated
> > > > > >> >> > > > questions/discussions and also to think of the
> > > > problem/solution in a
> > > > > >> >> > full
> > > > > >> >> > > > picture.
> > > > > >> >> > > >
> > > > > >> >> > > > Looking forward to your feedback
> > > > > >> >> > > >
> > > > > >> >> > > > Best,
> > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> > > > > >> >> > > >
> > > > > >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> > > > > >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> > > > > >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> > > > > >> >> > >
> > > > > >> >> >
> > > >
> > >
> >
>

Reply via email to