Hi Yunfeng,

For 1:
I had a discussion with Lincoln Lee, and I realize it is a common case the
same-key record should be blocked before the `processElement`. It is easier
for users to understand. Thus I will introduce a strict mode for this and
make it default. My rough idea is just like yours, by invoking some method
of AEC instance before `processElement`. The detailed design will be
described in FLIP later.

For 2:
I agree with you. We could throw exceptions for now and optimize this later.

For 5:

> It might be better to move the default values to the Proposed Changes
> section instead of making them public for now, as there will be
> compatibility issues once we want to dynamically adjust the thresholds
> and timeouts in future.
>
Agreed. The whole framework is under experiment until we think it is
complete in 2.0 or later. The default value should be better determined
with more testing results and production experience.

The configuration execution.async-state.enabled seems unnecessary, as
> the infrastructure may automatically get this information from the
> detailed state backend configurations. We may revisit this part after
> the core designs have reached an agreement. WDYT?
>

I'm not very sure if there is any use case where users write their code
using async APIs but run their job in a synchronous way. The first two
scenarios that come to me are for benchmarking or for a small state, while
they don't want to rewrite their code. Actually it is easy to support, so
I'd suggest providing it. But I'm fine with revisiting this later since it
is not important. WDYT?

For 8:
Yes, we had considered the I/O metrics group especially the back-pressure,
idle and task busy per second. In the current plan we can do state access
during back-pressure, meaning that those metrics for input would better
be redefined. I suggest we discuss these existing metrics as well as some
new metrics that should be introduced in FLIP-431 later in milestone 2,
since we have basically finished the framework thus we will have a better
view of what metrics should be like afterwards. WDYT?


Best,
Zakelly

On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com>
wrote:

> Hi Zakelly,
>
> Thanks for the responses!
>
> > 1. I will discuss this with some expert SQL developers. ... mode for
> StreamRecord processing.
>
> In DataStream API there should also be use cases when the order of
> output is strictly required. I agree with it that SQL experts may help
> provide more concrete use cases that can accelerate our discussion,
> but please allow me to search for DataStream use cases that can prove
> the necessity of this strict order preservation mode, if answers from
> SQL experts are shown to be negative.
>
> For your convenience, my current rough idea is that we can add a
> module between the Input(s) and processElement() module in Fig 2 of
> FLIP-425. The module will be responsible for caching records whose
> keys collide with in-flight records, and AEC will only be responsible
> for handling async state calls, without knowing the record each call
> belongs to. We may revisit this topic once the necessity of the strict
> order mode is clarified.
>
>
> > 2. The amount of parallel StateRequests ... instead of invoking yield
>
> Your suggestions generally appeal to me. I think we may let
> corresponding Flink jobs fail with OOM for now, since the majority of
> a StateRequest should just be references to existing Java objects,
> which only occupies very small memory space and can hardly cause OOM
> in common cases. We can monitor the pending StateRequests and if there
> is really a risk of OOM in extreme cases, we can throw Exceptions with
> proper messages notifying users what to do, like increasing memory
> through configurations.
>
> Your suggestions to adjust threshold adaptively or to use the blocking
> buffer sounds good, and in my opinion we can postpone them to future
> FLIPs since they seem to only benefit users in rare cases. Given that
> FLIP-423~428 has already been a big enough design, it might be better
> to focus on the most critical design for now and postpone
> optimizations like this. WDYT?
>
>
> > 5. Sure, we will introduce new configs as well as their default value.
>
> Thanks for adding the default values and the values themselves LGTM.
> It might be better to move the default values to the Proposed Changes
> section instead of making them public for now, as there will be
> compatibility issues once we want to dynamically adjust the thresholds
> and timeouts in future.
>
> The configuration execution.async-state.enabled seems unnecessary, as
> the infrastructure may automatically get this information from the
> detailed state backend configurations. We may revisit this part after
> the core designs have reached an agreement. WDYT?
>
>
> Besides, inspired by Jeyhun's comments, it comes to me that
>
> 8. Should this FLIP introduce metrics that measure the time a Flink
> job is back-pressured by State IOs? Under the current design, this
> metric could measure the time when the blocking buffer is full and
> yield() cannot get callbacks to process, which means the operator is
> fully waiting for state responses.
>
> Best regards,
> Yunfeng
>
> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > Hi Yunfeng,
> >
> > Thanks for your detailed comments!
> >
> >> 1. Why do we need a close() method on StateIterator? This method seems
> >> unused in the usage example codes.
> >
> >
> > The `close()` is introduced to release internal resources, but it does
> not seem to require the user to call it. I removed this.
> >
> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> >> entries are allowed". It might be better to further explain what will
> >> happen if a null value is passed, ignoring the value in the returned
> >> Collection or throwing exceptions. Given that
> >> FutureUtils.emptyFuture() can be returned in the example code, I
> >> suppose the former one might be correct.
> >
> >
> > The statement "No null entries are allowed" refers to the parameters, it
> means some arrayList like [null, StateFuture1, StateFuture2] passed in are
> not allowed, and an Exception will be thrown.
> >
> >> 1. According to Fig 2 of this FLIP, ... . This situation should be
> >> avoided and the order of same-key records should be strictly
> >> preserved.
> >
> >
> > I will discuss this with some expert SQL developers. And if it is valid
> and common, I suggest a strict order preservation mode for StreamRecord
> processing. WDYT?
> >
> >> 2. The FLIP says that StateRequests submitted by Callbacks will not
> >> invoke further yield() methods. Given that yield() is used when there
> >> is "too much" in-flight data, does it mean StateRequests submitted by
> >> Callbacks will never be "too much"? What if the total number of
> >> StateRequests exceed the capacity of Flink operator's memory space?
> >
> >
> > The amount of parallel StateRequests for one StreamRecord cannot be
> determined since the code is written by users. So the in-flight requests
> may be "too much", and may cause OOM. Users should re-configure their job,
> controlling the amount of on-going StreamRecord. And I suggest two ways to
> avoid this:
> >
> > Adaptively adjust the count of on-going StreamRecord according to
> historical StateRequests amount.
> > Also control the max StateRequests that can be executed in parallel for
> each StreamRecord, and if it exceeds, put the new StateRequest in the
> blocking buffer waiting for execution (instead of invoking yield()).
> >
> > WDYT?
> >
> >
> >> 3.1 I'm concerned that the out-of-order execution mode, along with the
> >> epoch mechanism, would bring more complexity to the execution model
> >> than the performance improvement it promises. Could we add some
> >> benchmark results proving the benefit of this mode?
> >
> >
> > Agreed, will do.
> >
> >> 3.2 The FLIP might need to add a public API section describing how
> >> users or developers can switch between these two execution modes.
> >
> >
> > Good point. We will add a Public API section.
> >
> >> 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> >> there are also more other events that might appear in the stream of
> >> data records. It might be better to generalize the execution mode
> >> mechanism to handle all possible events.
> >
> >
> > Yes, I missed this point. Thanks for the reminder.
> >
> >> 4. It might be better to treat callback-handling as a
> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> >> repeatedly creating Mail objects.
> >
> >
> >  I thought the intermediated wrapper for callback can not be omitted,
> since there will be some context switch before each execution. The
> MailboxDefaultAction in most cases is processInput right? While the
> callback should be executed with higher priority. I'd suggest not changing
> the basic logic of Mailbox and the default action since it is very critical
> for performance. But yes, we will try our best to avoid creating
> intermediated objects.
> >
> >> 5. Could this FLIP provide the current default values for things like
> >> active buffer size thresholds and timeouts? These could help with
> >> memory consumption and latency analysis.
> >
> >
> > Sure, we will introduce new configs as well as their default value.
> >
> >> 6. Why do we need to record the hashcode of a record in its
> >> RecordContext? It seems not used.
> >
> >
> > The context switch before each callback execution involves
> setCurrentKey, where the hashCode is re-calculated. We cache it for
> accelerating.
> >
> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link
> >> points to a document in flink-1.15. It might be better to verify the
> >> referenced content is still valid in the latest Flink and update the
> >> link accordingly. Same for other references if any.
> >
> >
> > Thanks for the reminder! Will check.
> >
> >
> > Thanks a lot & Best,
> > Zakelly
> >
> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >>
> >> Hi,
> >>
> >> Thanks for the great proposals. I have a few comments comments:
> >>
> >> - Backpressure Handling. Flink's original backpressure handling is quite
> >> robust and the semantics is quite "simple" (simple is beautiful).
> >> This mechanism has proven to perform better/robust than the other open
> >> source streaming systems, where they were relying on some loopback
> >> information.
> >> Now that the proposal also relies on loopback (yield in this case), it
> is
> >> not clear how well the new backpressure handling proposed in FLIP-425 is
> >> robust and handle fluctuating workloads.
> >>
> >> - Watermark/Timer Handling: Similar arguments apply for watermark and
> timer
> >> handling. IMHO, we need more benchmarks showing the overhead
> >> of epoch management with different parameters (e.g., window size,
> watermark
> >> strategy, etc)
> >>
> >> - DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
> >> However, different cloud providers have different storage consistency
> >> models.
> >> How do we want to deal with them?
> >>
> >>  Regards,
> >> Jeyhun
> >>
> >>
> >>
> >>
> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> >>
> >> > Thanks Piotr for sharing your thoughts!
> >> >
> >> > I guess it depends how we would like to treat the local disks. I've
> always
> >> > > thought about them that almost always eventually all state from the
> DFS
> >> > > should end up cached in the local disks.
> >> >
> >> >
> >> > OK I got it. In our proposal we treat local disk as an optional
> cache, so
> >> > the basic design will handle the case with state residing in DFS
> only. It
> >> > is a more 'cloud-native' approach that does not rely on any local
> storage
> >> > assumptions, which allow users to dynamically adjust the capacity or
> I/O
> >> > bound of remote storage to gain performance or save the cost, even
> without
> >> > a job restart.
> >> >
> >> > In
> >> > > the currently proposed more fine grained solution, you make a single
> >> > > request to DFS per each state access.
> >> > >
> >> >
> >> > Ah that's not accurate. Actually we buffer the state requests and
> process
> >> > them in batch, multiple requests will correspond to one DFS access
> (One
> >> > block access for multiple keys performed by RocksDB).
> >> >
> >> > In that benchmark you mentioned, are you requesting the state
> >> > > asynchronously from local disks into memory? If the benefit comes
> from
> >> > > parallel I/O, then I would expect the benefit to disappear/shrink
> when
> >> > > running multiple subtasks on the same machine, as they would be
> making
> >> > > their own parallel requests, right? Also enabling checkpointing
> would
> >> > > further cut into the available I/O budget.
> >> >
> >> >
> >> > That's an interesting topic. Our proposal is specifically aimed at the
> >> > scenario where the machine I/O is not fully loaded but the I/O
> latency has
> >> > indeed become a bottleneck for each subtask. While the distributed
> file
> >> > system is a prime example of a scenario characterized by abundant and
> >> > easily scalable I/O bandwidth coupled with higher I/O latency. You may
> >> > expect to increase the parallelism of a job to enhance the
> performance as
> >> > well, but that also brings in more waste of CPU's and memory for
> building
> >> > up more subtasks. This is one drawback for the computation-storage
> tightly
> >> > coupled nodes. While in our proposal, the parallel I/O with all the
> >> > callbacks still running in one task, pre-allocated computational
> resources
> >> > are better utilized. It is a much more lightweight way to perform
> parallel
> >> > I/O.
> >> >
> >> > Just with what granularity those async requests should be made.
> >> > > Making state access asynchronous is definitely the right way to go!
> >> >
> >> >
> >> > I think the current proposal is based on such core ideas:
> >> >
> >> >    - A pure cloud-native disaggregated state.
> >> >    - Fully utilize the given resources and try not to waste them
> (including
> >> >    I/O).
> >> >    - The ability to scale isolated resources (I/O or CPU or memory)
> >> >    independently.
> >> >
> >> > We think a fine-grained granularity is more inline with those ideas,
> >> > especially without local disk assumptions and without any waste of
> I/O by
> >> > prefetching. Please note that it is not a replacement of the original
> local
> >> > state with synchronous execution. Instead this is a solution
> embracing the
> >> > cloud-native era, providing much more scalability and resource
> efficiency
> >> > when handling a *huge state*.
> >> >
> >> > What also worries me a lot in this fine grained model is the effect
> on the
> >> > > checkpointing times.
> >> >
> >> >
> >> > Your concerns are very reasonable. Faster checkpointing is always a
> core
> >> > advantage of disaggregated state, but only for the async phase. There
> will
> >> > be some complexity introduced by in-flight requests, but I'd suggest a
> >> > checkpoint containing those in-flight state requests as part of the
> state,
> >> > to accelerate the sync phase by skipping the buffer draining. This
> makes
> >> > the buffer size have little impact on checkpoint time. And all the
> changes
> >> > keep within the execution model we proposed while the checkpoint
> barrier
> >> > alignment or handling will not be touched in our proposal, so I guess
> >> > the complexity is relatively controllable. I have faith in that :)
> >> >
> >> > Also regarding the overheads, it would be great if you could provide
> >> > > profiling results of the benchmarks that you conducted to verify the
> >> > > results. Or maybe if you could describe the steps to reproduce the
> >> > results?
> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
> >> > >
> >> >
> >> > Yes we could profile the benchmarks. And for the comparison of
> "Hashmap
> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job
> written
> >> > with async APIs but disabling the async execution by directly
> completing
> >> > the future using sync state access. This evaluates the overhead of
> newly
> >> > introduced modules like 'AEC' in sync execution (even though they are
> not
> >> > designed for it). The code will be provided later. For other results
> of our
> >> > PoC[1], you can follow the instructions here[2] to reproduce. Since
> the
> >> > compilation may take some effort, we will directly provide the jar for
> >> > testing next week.
> >> >
> >> >
> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit late in my
> >> > local time and the next few days are weekends. So I will reply to you
> >> > later. Thanks for your response!
> >> >
> >> >
> >> > [1]
> >> >
> >> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> >> > [2]
> >> >
> >> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> >> >
> >> >
> >> > Best,
> >> > Zakelly
> >> >
> >> >
> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi,
> >> > >
> >> > > Thanks for proposing this design! I just read FLIP-424 and FLIP-425
> >> > > and have some questions about the proposed changes.
> >> > >
> >> > > For Async API (FLIP-424)
> >> > >
> >> > > 1. Why do we need a close() method on StateIterator? This method
> seems
> >> > > unused in the usage example codes.
> >> > >
> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> >> > > entries are allowed". It might be better to further explain what
> will
> >> > > happen if a null value is passed, ignoring the value in the returned
> >> > > Collection or throwing exceptions. Given that
> >> > > FutureUtils.emptyFuture() can be returned in the example code, I
> >> > > suppose the former one might be correct.
> >> > >
> >> > >
> >> > > For Async Execution (FLIP-425)
> >> > >
> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key collide
> >> > > with an ongoing recordA, its processElement() method can still be
> >> > > triggered immediately, and then it might be moved to the blocking
> >> > > buffer in AEC if it involves state operations. This means that
> >> > > recordB's output will precede recordA's output in downstream
> >> > > operators, if recordA involves state operations while recordB does
> >> > > not. This will harm the correctness of Flink jobs in some use cases.
> >> > > For example, in dim table join cases, recordA could be a delete
> >> > > operation that involves state access, while recordB could be an
> insert
> >> > > operation that needs to visit external storage without state access.
> >> > > If recordB's output precedes recordA's, then an entry that is
> supposed
> >> > > to finally exist with recordB's value in the sink table might
> actually
> >> > > be deleted according to recordA's command. This situation should be
> >> > > avoided and the order of same-key records should be strictly
> >> > > preserved.
> >> > >
> >> > > 2. The FLIP says that StateRequests submitted by Callbacks will not
> >> > > invoke further yield() methods. Given that yield() is used when
> there
> >> > > is "too much" in-flight data, does it mean StateRequests submitted
> by
> >> > > Callbacks will never be "too much"? What if the total number of
> >> > > StateRequests exceed the capacity of Flink operator's memory space?
> >> > >
> >> > > 3. In the "Watermark" section, this FLIP provided an out-of-order
> >> > > execution mode apart from the default strictly-ordered mode, which
> can
> >> > > optimize performance by allowing more concurrent executions.
> >> > >
> >> > > 3.1 I'm concerned that the out-of-order execution mode, along with
> the
> >> > > epoch mechanism, would bring more complexity to the execution model
> >> > > than the performance improvement it promises. Could we add some
> >> > > benchmark results proving the benefit of this mode?
> >> > >
> >> > > 3.2 The FLIP might need to add a public API section describing how
> >> > > users or developers can switch between these two execution modes.
> >> > >
> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> >> > > there are also more other events that might appear in the stream of
> >> > > data records. It might be better to generalize the execution mode
> >> > > mechanism to handle all possible events.
> >> > >
> >> > > 4. It might be better to treat callback-handling as a
> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead of
> >> > > repeatedly creating Mail objects.
> >> > >
> >> > > 5. Could this FLIP provide the current default values for things
> like
> >> > > active buffer size thresholds and timeouts? These could help with
> >> > > memory consumption and latency analysis.
> >> > >
> >> > > 6. Why do we need to record the hashcode of a record in its
> >> > > RecordContext? It seems not used.
> >> > >
> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the link
> >> > > points to a document in flink-1.15. It might be better to verify the
> >> > > referenced content is still valid in the latest Flink and update the
> >> > > link accordingly. Same for other references if any.
> >> > >
> >> > > Best,
> >> > > Yunfeng Zhou
> >> > >
> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com>
> wrote:
> >> > > >
> >> > > > Hi Devs,
> >> > > >
> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li,
> Hangxiang
> >> > Yu,
> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> >> > > introducing
> >> > > > Disaggregated State Storage and Management in Flink 2.0.
> >> > > >
> >> > > > The past decade has witnessed a dramatic shift in Flink's
> deployment
> >> > > mode,
> >> > > > workload patterns, and hardware improvements. We've moved from the
> >> > > > map-reduce era where workers are computation-storage tightly
> coupled
> >> > > nodes
> >> > > > to a cloud-native world where containerized deployments on
> Kubernetes
> >> > > > become standard. To enable Flink's Cloud-Native future, we
> introduce
> >> > > > Disaggregated State Storage and Management that uses DFS as
> primary
> >> > > storage
> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> >> > > >
> >> > > > Design Details can be found in FLIP-423[1].
> >> > > >
> >> > > > This new architecture is aimed to solve the following challenges
> >> > brought
> >> > > in
> >> > > > the cloud-native era for Flink.
> >> > > > 1. Local Disk Constraints in containerization
> >> > > > 2. Spiky Resource Usage caused by compaction in the current state
> model
> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of
> Terabytes)
> >> > > > 4. Light and Fast Checkpoint in a native way
> >> > > >
> >> > > > More specifically, we want to reach a consensus on the following
> issues
> >> > > in
> >> > > > this discussion:
> >> > > >
> >> > > > 1. Overall design
> >> > > > 2. Proposed Changes
> >> > > > 3. Design details to achieve Milestone1
> >> > > >
> >> > > > In M1, we aim to achieve an end-to-end baseline version using DFS
> as
> >> > > > primary storage and complete core functionalities, including:
> >> > > >
> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> >> > > > asynchronous state access.
> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a
> non-blocking
> >> > > > execution model leveraging the asynchronous APIs introduced in
> >> > FLIP-424.
> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of
> >> > remote
> >> > > > state data in batches to avoid unnecessary round-trip costs for
> remote
> >> > > > access
> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial
> >> > version
> >> > > of
> >> > > > the ForSt disaggregated state store.
> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> >> > > > checkpointing mechanisms with the disaggregated state store for
> fault
> >> > > > tolerance and fast rescaling.
> >> > > >
> >> > > > We will vote on each FLIP in separate threads to make sure each
> FLIP
> >> > > > reaches a consensus. But we want to keep the discussion within a
> >> > focused
> >> > > > thread (this thread) for easier tracking of contexts to avoid
> >> > duplicated
> >> > > > questions/discussions and also to think of the problem/solution
> in a
> >> > full
> >> > > > picture.
> >> > > >
> >> > > > Looking forward to your feedback
> >> > > >
> >> > > > Best,
> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> >> > > >
> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> >> > >
> >> >
>

Reply via email to