Thanks for the quick response. Sounds good to me.

Best,

Xintong



On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Xintong,
>
> Thanks for sharing your thoughts!
>
> 1. Regarding Record-ordered and State-ordered of processElement.
> >
> > I understand that while State-ordered likely provides better performance,
> > Record-ordered is sometimes required for correctness. The question is how
> > should a user choose between these two modes? My concern is that such a
> > decision may require users to have in-depth knowledge about the Flink
> > internals, and may lead to correctness issues if State-ordered is chosen
> > improperly.
> >
> > I'd suggest not to expose such a knob, at least in the first version.
> That
> > means always use Record-ordered for custom operators / UDFs, and keep
> > State-ordered for internal usages (built-in operators) only.
> >
>
> Indeed, users may not be able to choose the mode properly. I agree to keep
> such options for internal use.
>
>
> 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> >
> > I'm not entirely sure about Strictly-ordered being the default, or even
> > being supported. From my understanding, a Watermark(T) only suggests that
> > all records with event time before T has arrived, and it has nothing to
> do
> > with whether records with event time after T has arrived or not. From
> that
> > perspective, preventing certain records from arriving before a Watermark
> is
> > never supported. I also cannot come up with any use case where
> > Strictly-ordered is necessary. This implies the same issue as 1): how
> does
> > the user choose between the two modes?
> >
> > I'd suggest not expose the knob to users and only support Out-of-order,
> > until we see a concrete use case that Strictly-ordered is needed.
> >
>
> The semantics of watermarks do not define the sequence between a watermark
> and subsequent records. For the most part, this is inconsequential, except
> it may affect some current users who have previously relied on the implicit
> assumption of an ordered execution. I'd be fine with initially supporting
> only out-of-order processing. We may consider exposing the
> 'Strictly-ordered' mode once we encounter a concrete use case that
> necessitates it.
>
>
> My philosophies behind not exposing the two config options are:
> > - There are already too many options in Flink that barely know how to use
> > them. I think Flink should try as much as possible to decide its own
> > behavior, rather than throwing all the decisions to the users.
> > - It's much harder to take back knobs than to introduce them. Therefore,
> > options should be introduced only if concrete use cases are identified.
> >
>
> I agree to keep minimal configurable items especially for the MVP. Given
> that we have the opportunity to refine the functionality before the
> framework transitions from @Experimental to @PublicEvolving, it makes sense
> to refrain from presenting user-facing options until we have ensured
> their necessity.
>
>
> Best,
> Zakelly
>
> On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > Sorry for joining the discussion late.
> >
> > I have two questions about FLIP-425.
> >
> > 1. Regarding Record-ordered and State-ordered of processElement.
> >
> > I understand that while State-ordered likely provides better performance,
> > Record-ordered is sometimes required for correctness. The question is how
> > should a user choose between these two modes? My concern is that such a
> > decision may require users to have in-depth knowledge about the Flink
> > internals, and may lead to correctness issues if State-ordered is chosen
> > improperly.
> >
> > I'd suggest not to expose such a knob, at least in the first version.
> That
> > means always use Record-ordered for custom operators / UDFs, and keep
> > State-ordered for internal usages (built-in operators) only.
> >
> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> >
> > I'm not entirely sure about Strictly-ordered being the default, or even
> > being supported. From my understanding, a Watermark(T) only suggests that
> > all records with event time before T has arrived, and it has nothing to
> do
> > with whether records with event time after T has arrived or not. From
> that
> > perspective, preventing certain records from arriving before a Watermark
> is
> > never supported. I also cannot come up with any use case where
> > Strictly-ordered is necessary. This implies the same issue as 1): how
> does
> > the user choose between the two modes?
> >
> > I'd suggest not expose the knob to users and only support Out-of-order,
> > until we see a concrete use case that Strictly-ordered is needed.
> >
> >
> > My philosophies behind not exposing the two config options are:
> > - There are already too many options in Flink that barely know how to use
> > them. I think Flink should try as much as possible to decide its own
> > behavior, rather than throwing all the decisions to the users.
> > - It's much harder to take back knobs than to introduce them. Therefore,
> > options should be introduced only if concrete use cases are identified.
> >
> > WDYT?
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid>
> wrote:
> >
> > > +1 for Gyula's suggestion. I just finished FLIP-423 which introduced
> the
> > > intention of the big change and high level architecture. Great content
> > btw!
> > > The only public interface change for this FLIP is one new config to use
> > > ForSt. It makes sense to have one dedicated discussion thread for each
> > > concrete system design.
> > >
> > > @Zakelly The links in your mail do not work except the last one,
> because
> > > the FLIP-xxx has been included into the url like
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > .
> > >
> > > NIT fix:
> > >
> > > FLIP-424:
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > >
> > > FLIP-425:
> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > >
> > > FLIP-426:
> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > >
> > > FLIP-427:
> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > >
> > > FLIP-428:
> > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > >
> > > Best regards,
> > > Jing
> > >
> > >
> > >
> > >
> > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Thank you all for a lively discussion here, and it is a good time to
> > move
> > > > forward to more detailed discussions. Thus we open several threads
> for
> > > > sub-FLIPs:
> > > >
> > > > FLIP-424:
> > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > > FLIP-425
> > > > <
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > >:
> > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > > FLIP-426
> > > > <
> > >
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> > >:
> > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > > FLIP-427
> > > > <
> > >
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> > >:
> > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > > FLIP-428
> > > > <
> > >
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> > >:
> > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > >
> > > > If you want to talk about the overall architecture, roadmap,
> milestones
> > > or
> > > > something related with multiple FLIPs, please post it here. Otherwise
> > you
> > > > can discuss some details in separate mails. Let's try to avoid
> repeated
> > > > discussion in different threads. I will sync important messages here
> if
> > > > there are any in the above threads.
> > > >
> > > > And reply to @Jeyhun: We will ensure the content between those FLIPs
> is
> > > > consistent.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com>
> > wrote:
> > > >
> > > > > I have been a bit busy these few weeks and sorry for responding
> late.
> > > > >
> > > > > The original thinking of keeping discussion within one thread is
> for
> > > > easier
> > > > > tracking and avoid for repeated discussion in different threads.
> > > > >
> > > > > For details, It might be good to start in different threads if
> > needed.
> > > > >
> > > > > We will think of a way to better organize the discussion.
> > > > >
> > > > > Best
> > > > > Yuan
> > > > >
> > > > >
> > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> je.kari...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > + 1 for the suggestion.
> > > > > > Maybe we can the discussion with the FLIPs with minimum
> > dependencies
> > > > > (from
> > > > > > the other new/proposed FLIPs).
> > > > > > Based on our discussion on a particular FLIP, the subsequent (or
> > its
> > > > > > dependent) FLIP(s) can be updated accordingly?
> > > > > >
> > > > > > Regards,
> > > > > > Jeyhun
> > > > > >
> > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey all!
> > > > > > >
> > > > > > > This is a massive improvement / work. I just started going
> > through
> > > > the
> > > > > > > Flips and have a more or less meta comment.
> > > > > > >
> > > > > > > While it's good to keep the overall architecture discussion
> > here, I
> > > > > think
> > > > > > > we should still have separate discussions for each FLIP where
> we
> > > can
> > > > > > > discuss interface details etc. With so much content if we start
> > > > adding
> > > > > > > minor comments here that will lead to nowhere but those
> > discussions
> > > > are
> > > > > > > still important and we should have them in separate threads
> (one
> > > for
> > > > > each
> > > > > > > FLIP)
> > > > > > >
> > > > > > > What do you think?
> > > > > > > Gyula
> > > > > > >
> > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi team,
> > > > > > > >
> > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have
> > > > supplemented
> > > > > > > > several updates to answer high-frequency questions:
> > > > > > > >
> > > > > > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > > > > > "Synchronous execution with asynchronous APIs"[1], which
> > reveals
> > > > that
> > > > > > > > the framework overhead (including reference counting,
> > > > future-related
> > > > > > > > code and so on) consumes about 9% of the keyed operator CPU
> > time.
> > > > > > > > 2. We added a set of comparative experiments for watermark
> > > > > processing,
> > > > > > > > the performance of Out-Of-Order mode is 70% better than
> > > > > > > > strictly-ordered mode under ~140MB state size. Instructions
> on
> > > how
> > > > to
> > > > > > > > run this test have also been added[2].
> > > > > > > > 3. Regarding the order of StreamRecord, whether it has state
> > > access
> > > > > or
> > > > > > > > not. We supplemented a new *Strict order of
> > 'processElement'*[3].
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > > > > > [2]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > > > > > [3]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Yanfei
> > > > > > > >
> > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二
> > 09:25写道:
> > > > > > > > >
> > > > > > > > > Hi Zakelly,
> > > > > > > > >
> > > > > > > > > > 5. I'm not very sure ... revisiting this later since it
> is
> > > not
> > > > > > > > important.
> > > > > > > > >
> > > > > > > > > It seems that we still have some details to confirm about
> > this
> > > > > > > > > question. Let's postpone this to after the critical parts
> of
> > > the
> > > > > > > > > design are settled.
> > > > > > > > >
> > > > > > > > > > 8. Yes, we had considered ... metrics should be like
> > > > afterwards.
> > > > > > > > >
> > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this
> > topic
> > > > in
> > > > > > > > milestone 2.
> > > > > > > > >
> > > > > > > > > Looking forward to the detailed design about the strict
> mode
> > > > > between
> > > > > > > > > same-key records and the benchmark results about the epoch
> > > > > mechanism.
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Yunfeng
> > > > > > > > >
> > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
> > > > zakelly....@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Yunfeng,
> > > > > > > > > >
> > > > > > > > > > For 1:
> > > > > > > > > > I had a discussion with Lincoln Lee, and I realize it is
> a
> > > > common
> > > > > > > case
> > > > > > > > the same-key record should be blocked before the
> > > `processElement`.
> > > > It
> > > > > > is
> > > > > > > > easier for users to understand. Thus I will introduce a
> strict
> > > mode
> > > > > for
> > > > > > > > this and make it default. My rough idea is just like yours,
> by
> > > > > invoking
> > > > > > > > some method of AEC instance before `processElement`. The
> > detailed
> > > > > > design
> > > > > > > > will be described in FLIP later.
> > > > > > > > > >
> > > > > > > > > > For 2:
> > > > > > > > > > I agree with you. We could throw exceptions for now and
> > > > optimize
> > > > > > this
> > > > > > > > later.
> > > > > > > > > >
> > > > > > > > > > For 5:
> > > > > > > > > >>
> > > > > > > > > >> It might be better to move the default values to the
> > > Proposed
> > > > > > > Changes
> > > > > > > > > >> section instead of making them public for now, as there
> > will
> > > > be
> > > > > > > > > >> compatibility issues once we want to dynamically adjust
> > the
> > > > > > > thresholds
> > > > > > > > > >> and timeouts in future.
> > > > > > > > > >
> > > > > > > > > > Agreed. The whole framework is under experiment until we
> > > think
> > > > it
> > > > > > is
> > > > > > > > complete in 2.0 or later. The default value should be better
> > > > > determined
> > > > > > > > with more testing results and production experience.
> > > > > > > > > >
> > > > > > > > > >> The configuration execution.async-state.enabled seems
> > > > > unnecessary,
> > > > > > > as
> > > > > > > > > >> the infrastructure may automatically get this
> information
> > > from
> > > > > the
> > > > > > > > > >> detailed state backend configurations. We may revisit
> this
> > > > part
> > > > > > > after
> > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I'm not very sure if there is any use case where users
> > write
> > > > > their
> > > > > > > > code using async APIs but run their job in a synchronous way.
> > The
> > > > > first
> > > > > > > two
> > > > > > > > scenarios that come to me are for benchmarking or for a small
> > > > state,
> > > > > > > while
> > > > > > > > they don't want to rewrite their code. Actually it is easy to
> > > > > support,
> > > > > > so
> > > > > > > > I'd suggest providing it. But I'm fine with revisiting this
> > later
> > > > > since
> > > > > > > it
> > > > > > > > is not important. WDYT?
> > > > > > > > > >
> > > > > > > > > > For 8:
> > > > > > > > > > Yes, we had considered the I/O metrics group especially
> the
> > > > > > > > back-pressure, idle and task busy per second. In the current
> > plan
> > > > we
> > > > > > can
> > > > > > > do
> > > > > > > > state access during back-pressure, meaning that those metrics
> > for
> > > > > input
> > > > > > > > would better be redefined. I suggest we discuss these
> existing
> > > > > metrics
> > > > > > as
> > > > > > > > well as some new metrics that should be introduced in
> FLIP-431
> > > > later
> > > > > in
> > > > > > > > milestone 2, since we have basically finished the framework
> > thus
> > > we
> > > > > > will
> > > > > > > > have a better view of what metrics should be like afterwards.
> > > WDYT?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Zakelly
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> > > > > > > > > >>
> > > > > > > > > >> Hi Zakelly,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the responses!
> > > > > > > > > >>
> > > > > > > > > >> > 1. I will discuss this with some expert SQL
> developers.
> > > ...
> > > > > mode
> > > > > > > > for StreamRecord processing.
> > > > > > > > > >>
> > > > > > > > > >> In DataStream API there should also be use cases when
> the
> > > > order
> > > > > of
> > > > > > > > > >> output is strictly required. I agree with it that SQL
> > > experts
> > > > > may
> > > > > > > help
> > > > > > > > > >> provide more concrete use cases that can accelerate our
> > > > > > discussion,
> > > > > > > > > >> but please allow me to search for DataStream use cases
> > that
> > > > can
> > > > > > > prove
> > > > > > > > > >> the necessity of this strict order preservation mode, if
> > > > answers
> > > > > > > from
> > > > > > > > > >> SQL experts are shown to be negative.
> > > > > > > > > >>
> > > > > > > > > >> For your convenience, my current rough idea is that we
> can
> > > > add a
> > > > > > > > > >> module between the Input(s) and processElement() module
> in
> > > > Fig 2
> > > > > > of
> > > > > > > > > >> FLIP-425. The module will be responsible for caching
> > records
> > > > > whose
> > > > > > > > > >> keys collide with in-flight records, and AEC will only
> be
> > > > > > > responsible
> > > > > > > > > >> for handling async state calls, without knowing the
> record
> > > > each
> > > > > > call
> > > > > > > > > >> belongs to. We may revisit this topic once the necessity
> > of
> > > > the
> > > > > > > strict
> > > > > > > > > >> order mode is clarified.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> > 2. The amount of parallel StateRequests ... instead of
> > > > > invoking
> > > > > > > > yield
> > > > > > > > > >>
> > > > > > > > > >> Your suggestions generally appeal to me. I think we may
> > let
> > > > > > > > > >> corresponding Flink jobs fail with OOM for now, since
> the
> > > > > majority
> > > > > > > of
> > > > > > > > > >> a StateRequest should just be references to existing
> Java
> > > > > objects,
> > > > > > > > > >> which only occupies very small memory space and can
> hardly
> > > > cause
> > > > > > OOM
> > > > > > > > > >> in common cases. We can monitor the pending
> StateRequests
> > > and
> > > > if
> > > > > > > there
> > > > > > > > > >> is really a risk of OOM in extreme cases, we can throw
> > > > > Exceptions
> > > > > > > with
> > > > > > > > > >> proper messages notifying users what to do, like
> > increasing
> > > > > memory
> > > > > > > > > >> through configurations.
> > > > > > > > > >>
> > > > > > > > > >> Your suggestions to adjust threshold adaptively or to
> use
> > > the
> > > > > > > blocking
> > > > > > > > > >> buffer sounds good, and in my opinion we can postpone
> them
> > > to
> > > > > > future
> > > > > > > > > >> FLIPs since they seem to only benefit users in rare
> cases.
> > > > Given
> > > > > > > that
> > > > > > > > > >> FLIP-423~428 has already been a big enough design, it
> > might
> > > be
> > > > > > > better
> > > > > > > > > >> to focus on the most critical design for now and
> postpone
> > > > > > > > > >> optimizations like this. WDYT?
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> > 5. Sure, we will introduce new configs as well as
> their
> > > > > default
> > > > > > > > value.
> > > > > > > > > >>
> > > > > > > > > >> Thanks for adding the default values and the values
> > > themselves
> > > > > > LGTM.
> > > > > > > > > >> It might be better to move the default values to the
> > > Proposed
> > > > > > > Changes
> > > > > > > > > >> section instead of making them public for now, as there
> > will
> > > > be
> > > > > > > > > >> compatibility issues once we want to dynamically adjust
> > the
> > > > > > > thresholds
> > > > > > > > > >> and timeouts in future.
> > > > > > > > > >>
> > > > > > > > > >> The configuration execution.async-state.enabled seems
> > > > > unnecessary,
> > > > > > > as
> > > > > > > > > >> the infrastructure may automatically get this
> information
> > > from
> > > > > the
> > > > > > > > > >> detailed state backend configurations. We may revisit
> this
> > > > part
> > > > > > > after
> > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes to me
> > that
> > > > > > > > > >>
> > > > > > > > > >> 8. Should this FLIP introduce metrics that measure the
> > time
> > > a
> > > > > > Flink
> > > > > > > > > >> job is back-pressured by State IOs? Under the current
> > > design,
> > > > > this
> > > > > > > > > >> metric could measure the time when the blocking buffer
> is
> > > full
> > > > > and
> > > > > > > > > >> yield() cannot get callbacks to process, which means the
> > > > > operator
> > > > > > is
> > > > > > > > > >> fully waiting for state responses.
> > > > > > > > > >>
> > > > > > > > > >> Best regards,
> > > > > > > > > >> Yunfeng
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
> > > > > > zakelly....@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > Hi Yunfeng,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for your detailed comments!
> > > > > > > > > >> >
> > > > > > > > > >> >> 1. Why do we need a close() method on StateIterator?
> > This
> > > > > > method
> > > > > > > > seems
> > > > > > > > > >> >> unused in the usage example codes.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > The `close()` is introduced to release internal
> > resources,
> > > > but
> > > > > > it
> > > > > > > > does not seem to require the user to call it. I removed this.
> > > > > > > > > >> >
> > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is
> stated
> > > that
> > > > > "No
> > > > > > > > null
> > > > > > > > > >> >> entries are allowed". It might be better to further
> > > explain
> > > > > > what
> > > > > > > > will
> > > > > > > > > >> >> happen if a null value is passed, ignoring the value
> in
> > > the
> > > > > > > > returned
> > > > > > > > > >> >> Collection or throwing exceptions. Given that
> > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the
> > example
> > > > > code,
> > > > > > I
> > > > > > > > > >> >> suppose the former one might be correct.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > The statement "No null entries are allowed" refers to
> > the
> > > > > > > > parameters, it means some arrayList like [null, StateFuture1,
> > > > > > > StateFuture2]
> > > > > > > > passed in are not allowed, and an Exception will be thrown.
> > > > > > > > > >> >
> > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This
> > situation
> > > > > should
> > > > > > > be
> > > > > > > > > >> >> avoided and the order of same-key records should be
> > > > strictly
> > > > > > > > > >> >> preserved.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I will discuss this with some expert SQL developers.
> And
> > > if
> > > > it
> > > > > > is
> > > > > > > > valid and common, I suggest a strict order preservation mode
> > for
> > > > > > > > StreamRecord processing. WDYT?
> > > > > > > > > >> >
> > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by
> > > Callbacks
> > > > > will
> > > > > > > not
> > > > > > > > > >> >> invoke further yield() methods. Given that yield() is
> > > used
> > > > > when
> > > > > > > > there
> > > > > > > > > >> >> is "too much" in-flight data, does it mean
> > StateRequests
> > > > > > > submitted
> > > > > > > > by
> > > > > > > > > >> >> Callbacks will never be "too much"? What if the total
> > > > number
> > > > > of
> > > > > > > > > >> >> StateRequests exceed the capacity of Flink operator's
> > > > memory
> > > > > > > space?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > The amount of parallel StateRequests for one
> > StreamRecord
> > > > > cannot
> > > > > > > be
> > > > > > > > determined since the code is written by users. So the
> in-flight
> > > > > > requests
> > > > > > > > may be "too much", and may cause OOM. Users should
> re-configure
> > > > their
> > > > > > > job,
> > > > > > > > controlling the amount of on-going StreamRecord. And I
> suggest
> > > two
> > > > > ways
> > > > > > > to
> > > > > > > > avoid this:
> > > > > > > > > >> >
> > > > > > > > > >> > Adaptively adjust the count of on-going StreamRecord
> > > > according
> > > > > > to
> > > > > > > > historical StateRequests amount.
> > > > > > > > > >> > Also control the max StateRequests that can be
> executed
> > in
> > > > > > > parallel
> > > > > > > > for each StreamRecord, and if it exceeds, put the new
> > > StateRequest
> > > > in
> > > > > > the
> > > > > > > > blocking buffer waiting for execution (instead of invoking
> > > > yield()).
> > > > > > > > > >> >
> > > > > > > > > >> > WDYT?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order execution
> mode,
> > > > along
> > > > > > > with
> > > > > > > > the
> > > > > > > > > >> >> epoch mechanism, would bring more complexity to the
> > > > execution
> > > > > > > model
> > > > > > > > > >> >> than the performance improvement it promises. Could
> we
> > > add
> > > > > some
> > > > > > > > > >> >> benchmark results proving the benefit of this mode?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Agreed, will do.
> > > > > > > > > >> >
> > > > > > > > > >> >> 3.2 The FLIP might need to add a public API section
> > > > > describing
> > > > > > > how
> > > > > > > > > >> >> users or developers can switch between these two
> > > execution
> > > > > > modes.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Good point. We will add a Public API section.
> > > > > > > > > >> >
> > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint mentioned
> > in
> > > > this
> > > > > > > FLIP,
> > > > > > > > > >> >> there are also more other events that might appear in
> > the
> > > > > > stream
> > > > > > > of
> > > > > > > > > >> >> data records. It might be better to generalize the
> > > > execution
> > > > > > mode
> > > > > > > > > >> >> mechanism to handle all possible events.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Yes, I missed this point. Thanks for the reminder.
> > > > > > > > > >> >
> > > > > > > > > >> >> 4. It might be better to treat callback-handling as a
> > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid the
> > > > overhead
> > > > > > of
> > > > > > > > > >> >> repeatedly creating Mail objects.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >  I thought the intermediated wrapper for callback can
> > not
> > > be
> > > > > > > > omitted, since there will be some context switch before each
> > > > > execution.
> > > > > > > The
> > > > > > > > MailboxDefaultAction in most cases is processInput right?
> While
> > > the
> > > > > > > > callback should be executed with higher priority. I'd suggest
> > not
> > > > > > > changing
> > > > > > > > the basic logic of Mailbox and the default action since it is
> > > very
> > > > > > > critical
> > > > > > > > for performance. But yes, we will try our best to avoid
> > creating
> > > > > > > > intermediated objects.
> > > > > > > > > >> >
> > > > > > > > > >> >> 5. Could this FLIP provide the current default values
> > for
> > > > > > things
> > > > > > > > like
> > > > > > > > > >> >> active buffer size thresholds and timeouts? These
> could
> > > > help
> > > > > > with
> > > > > > > > > >> >> memory consumption and latency analysis.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Sure, we will introduce new configs as well as their
> > > default
> > > > > > > value.
> > > > > > > > > >> >
> > > > > > > > > >> >> 6. Why do we need to record the hashcode of a record
> in
> > > its
> > > > > > > > > >> >> RecordContext? It seems not used.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > The context switch before each callback execution
> > involves
> > > > > > > > setCurrentKey, where the hashCode is re-calculated. We cache
> it
> > > for
> > > > > > > > accelerating.
> > > > > > > > > >> >
> > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or
> > RocksDB",
> > > > the
> > > > > > link
> > > > > > > > > >> >> points to a document in flink-1.15. It might be
> better
> > to
> > > > > > verify
> > > > > > > > the
> > > > > > > > > >> >> referenced content is still valid in the latest Flink
> > and
> > > > > > update
> > > > > > > > the
> > > > > > > > > >> >> link accordingly. Same for other references if any.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for the reminder! Will check.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks a lot & Best,
> > > > > > > > > >> > Zakelly
> > > > > > > > > >> >
> > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> > > > > > > je.kari...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >> >>
> > > > > > > > > >> >> Hi,
> > > > > > > > > >> >>
> > > > > > > > > >> >> Thanks for the great proposals. I have a few comments
> > > > > comments:
> > > > > > > > > >> >>
> > > > > > > > > >> >> - Backpressure Handling. Flink's original
> backpressure
> > > > > handling
> > > > > > > is
> > > > > > > > quite
> > > > > > > > > >> >> robust and the semantics is quite "simple" (simple is
> > > > > > beautiful).
> > > > > > > > > >> >> This mechanism has proven to perform better/robust
> than
> > > the
> > > > > > other
> > > > > > > > open
> > > > > > > > > >> >> source streaming systems, where they were relying on
> > some
> > > > > > > loopback
> > > > > > > > > >> >> information.
> > > > > > > > > >> >> Now that the proposal also relies on loopback (yield
> in
> > > > this
> > > > > > > > case), it is
> > > > > > > > > >> >> not clear how well the new backpressure handling
> > proposed
> > > > in
> > > > > > > > FLIP-425 is
> > > > > > > > > >> >> robust and handle fluctuating workloads.
> > > > > > > > > >> >>
> > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments apply
> for
> > > > > > watermark
> > > > > > > > and timer
> > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing the
> > > > overhead
> > > > > > > > > >> >> of epoch management with different parameters (e.g.,
> > > window
> > > > > > size,
> > > > > > > > watermark
> > > > > > > > > >> >> strategy, etc)
> > > > > > > > > >> >>
> > > > > > > > > >> >> - DFS consistency guarantees. The proposal in
> FLIP-427
> > is
> > > > > > > > DFS-agnostic.
> > > > > > > > > >> >> However, different cloud providers have different
> > storage
> > > > > > > > consistency
> > > > > > > > > >> >> models.
> > > > > > > > > >> >> How do we want to deal with them?
> > > > > > > > > >> >>
> > > > > > > > > >> >>  Regards,
> > > > > > > > > >> >> Jeyhun
> > > > > > > > > >> >>
> > > > > > > > > >> >>
> > > > > > > > > >> >>
> > > > > > > > > >> >>
> > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> > > > > > > zakelly....@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >> >>
> > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > I guess it depends how we would like to treat the
> > local
> > > > > > disks.
> > > > > > > > I've always
> > > > > > > > > >> >> > > thought about them that almost always eventually
> > all
> > > > > state
> > > > > > > > from the DFS
> > > > > > > > > >> >> > > should end up cached in the local disks.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > OK I got it. In our proposal we treat local disk as
> > an
> > > > > > optional
> > > > > > > > cache, so
> > > > > > > > > >> >> > the basic design will handle the case with state
> > > residing
> > > > > in
> > > > > > > DFS
> > > > > > > > only. It
> > > > > > > > > >> >> > is a more 'cloud-native' approach that does not
> rely
> > on
> > > > any
> > > > > > > > local storage
> > > > > > > > > >> >> > assumptions, which allow users to dynamically
> adjust
> > > the
> > > > > > > > capacity or I/O
> > > > > > > > > >> >> > bound of remote storage to gain performance or save
> > the
> > > > > cost,
> > > > > > > > even without
> > > > > > > > > >> >> > a job restart.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > In
> > > > > > > > > >> >> > > the currently proposed more fine grained
> solution,
> > > you
> > > > > > make a
> > > > > > > > single
> > > > > > > > > >> >> > > request to DFS per each state access.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the
> state
> > > > > requests
> > > > > > > > and process
> > > > > > > > > >> >> > them in batch, multiple requests will correspond to
> > one
> > > > DFS
> > > > > > > > access (One
> > > > > > > > > >> >> > block access for multiple keys performed by
> RocksDB).
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > In that benchmark you mentioned, are you requesting
> > the
> > > > > state
> > > > > > > > > >> >> > > asynchronously from local disks into memory? If
> the
> > > > > benefit
> > > > > > > > comes from
> > > > > > > > > >> >> > > parallel I/O, then I would expect the benefit to
> > > > > > > > disappear/shrink when
> > > > > > > > > >> >> > > running multiple subtasks on the same machine, as
> > > they
> > > > > > would
> > > > > > > > be making
> > > > > > > > > >> >> > > their own parallel requests, right? Also enabling
> > > > > > > > checkpointing would
> > > > > > > > > >> >> > > further cut into the available I/O budget.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > That's an interesting topic. Our proposal is
> > > specifically
> > > > > > aimed
> > > > > > > > at the
> > > > > > > > > >> >> > scenario where the machine I/O is not fully loaded
> > but
> > > > the
> > > > > > I/O
> > > > > > > > latency has
> > > > > > > > > >> >> > indeed become a bottleneck for each subtask. While
> > the
> > > > > > > > distributed file
> > > > > > > > > >> >> > system is a prime example of a scenario
> characterized
> > > by
> > > > > > > > abundant and
> > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with higher
> I/O
> > > > > > latency.
> > > > > > > > You may
> > > > > > > > > >> >> > expect to increase the parallelism of a job to
> > enhance
> > > > the
> > > > > > > > performance as
> > > > > > > > > >> >> > well, but that also brings in more waste of CPU's
> and
> > > > > memory
> > > > > > > for
> > > > > > > > building
> > > > > > > > > >> >> > up more subtasks. This is one drawback for the
> > > > > > > > computation-storage tightly
> > > > > > > > > >> >> > coupled nodes. While in our proposal, the parallel
> > I/O
> > > > with
> > > > > > all
> > > > > > > > the
> > > > > > > > > >> >> > callbacks still running in one task, pre-allocated
> > > > > > > computational
> > > > > > > > resources
> > > > > > > > > >> >> > are better utilized. It is a much more lightweight
> > way
> > > to
> > > > > > > > perform parallel
> > > > > > > > > >> >> > I/O.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Just with what granularity those async requests
> > should
> > > be
> > > > > > made.
> > > > > > > > > >> >> > > Making state access asynchronous is definitely
> the
> > > > right
> > > > > > way
> > > > > > > > to go!
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > I think the current proposal is based on such core
> > > ideas:
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >    - A pure cloud-native disaggregated state.
> > > > > > > > > >> >> >    - Fully utilize the given resources and try not
> to
> > > > waste
> > > > > > > them
> > > > > > > > (including
> > > > > > > > > >> >> >    I/O).
> > > > > > > > > >> >> >    - The ability to scale isolated resources (I/O
> or
> > > CPU
> > > > or
> > > > > > > > memory)
> > > > > > > > > >> >> >    independently.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > We think a fine-grained granularity is more inline
> > with
> > > > > those
> > > > > > > > ideas,
> > > > > > > > > >> >> > especially without local disk assumptions and
> without
> > > any
> > > > > > waste
> > > > > > > > of I/O by
> > > > > > > > > >> >> > prefetching. Please note that it is not a
> replacement
> > > of
> > > > > the
> > > > > > > > original local
> > > > > > > > > >> >> > state with synchronous execution. Instead this is a
> > > > > solution
> > > > > > > > embracing the
> > > > > > > > > >> >> > cloud-native era, providing much more scalability
> and
> > > > > > resource
> > > > > > > > efficiency
> > > > > > > > > >> >> > when handling a *huge state*.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > What also worries me a lot in this fine grained
> model
> > > is
> > > > > the
> > > > > > > > effect on the
> > > > > > > > > >> >> > > checkpointing times.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Your concerns are very reasonable. Faster
> > checkpointing
> > > > is
> > > > > > > > always a core
> > > > > > > > > >> >> > advantage of disaggregated state, but only for the
> > > async
> > > > > > phase.
> > > > > > > > There will
> > > > > > > > > >> >> > be some complexity introduced by in-flight
> requests,
> > > but
> > > > > I'd
> > > > > > > > suggest a
> > > > > > > > > >> >> > checkpoint containing those in-flight state
> requests
> > as
> > > > > part
> > > > > > of
> > > > > > > > the state,
> > > > > > > > > >> >> > to accelerate the sync phase by skipping the buffer
> > > > > draining.
> > > > > > > > This makes
> > > > > > > > > >> >> > the buffer size have little impact on checkpoint
> > time.
> > > > And
> > > > > > all
> > > > > > > > the changes
> > > > > > > > > >> >> > keep within the execution model we proposed while
> the
> > > > > > > checkpoint
> > > > > > > > barrier
> > > > > > > > > >> >> > alignment or handling will not be touched in our
> > > > proposal,
> > > > > > so I
> > > > > > > > guess
> > > > > > > > > >> >> > the complexity is relatively controllable. I have
> > faith
> > > > in
> > > > > > that
> > > > > > > > :)
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Also regarding the overheads, it would be great if
> > you
> > > > > could
> > > > > > > > provide
> > > > > > > > > >> >> > > profiling results of the benchmarks that you
> > > conducted
> > > > to
> > > > > > > > verify the
> > > > > > > > > >> >> > > results. Or maybe if you could describe the steps
> > to
> > > > > > > reproduce
> > > > > > > > the
> > > > > > > > > >> >> > results?
> > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with
> async
> > > > API".
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Yes we could profile the benchmarks. And for the
> > > > comparison
> > > > > > of
> > > > > > > > "Hashmap
> > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we conduct a
> > > > Wordcount
> > > > > > job
> > > > > > > > written
> > > > > > > > > >> >> > with async APIs but disabling the async execution
> by
> > > > > directly
> > > > > > > > completing
> > > > > > > > > >> >> > the future using sync state access. This evaluates
> > the
> > > > > > overhead
> > > > > > > > of newly
> > > > > > > > > >> >> > introduced modules like 'AEC' in sync execution
> (even
> > > > > though
> > > > > > > > they are not
> > > > > > > > > >> >> > designed for it). The code will be provided later.
> > For
> > > > > other
> > > > > > > > results of our
> > > > > > > > > >> >> > PoC[1], you can follow the instructions here[2] to
> > > > > reproduce.
> > > > > > > > Since the
> > > > > > > > > >> >> > compilation may take some effort, we will directly
> > > > provide
> > > > > > the
> > > > > > > > jar for
> > > > > > > > > >> >> > testing next week.
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it
> > is a
> > > > bit
> > > > > > > late
> > > > > > > > in my
> > > > > > > > > >> >> > local time and the next few days are weekends. So I
> > > will
> > > > > > reply
> > > > > > > > to you
> > > > > > > > > >> >> > later. Thanks for your response!
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > [1]
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > > > > > > > > >> >> > [2]
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > Best,
> > > > > > > > > >> >> > Zakelly
> > > > > > > > > >> >> >
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> > > > > > > > flink.zhouyunf...@gmail.com>
> > > > > > > > > >> >> > wrote:
> > > > > > > > > >> >> >
> > > > > > > > > >> >> > > Hi,
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > Thanks for proposing this design! I just read
> > > FLIP-424
> > > > > and
> > > > > > > > FLIP-425
> > > > > > > > > >> >> > > and have some questions about the proposed
> changes.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > For Async API (FLIP-424)
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 1. Why do we need a close() method on
> > StateIterator?
> > > > This
> > > > > > > > method seems
> > > > > > > > > >> >> > > unused in the usage example codes.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is
> > > stated
> > > > > that
> > > > > > > > "No null
> > > > > > > > > >> >> > > entries are allowed". It might be better to
> further
> > > > > explain
> > > > > > > > what will
> > > > > > > > > >> >> > > happen if a null value is passed, ignoring the
> > value
> > > in
> > > > > the
> > > > > > > > returned
> > > > > > > > > >> >> > > Collection or throwing exceptions. Given that
> > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned in the
> > > > example
> > > > > > > code,
> > > > > > > > I
> > > > > > > > > >> >> > > suppose the former one might be correct.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > For Async Execution (FLIP-425)
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB
> > has
> > > > its
> > > > > > key
> > > > > > > > collide
> > > > > > > > > >> >> > > with an ongoing recordA, its processElement()
> > method
> > > > can
> > > > > > > still
> > > > > > > > be
> > > > > > > > > >> >> > > triggered immediately, and then it might be moved
> > to
> > > > the
> > > > > > > > blocking
> > > > > > > > > >> >> > > buffer in AEC if it involves state operations.
> This
> > > > means
> > > > > > > that
> > > > > > > > > >> >> > > recordB's output will precede recordA's output in
> > > > > > downstream
> > > > > > > > > >> >> > > operators, if recordA involves state operations
> > while
> > > > > > recordB
> > > > > > > > does
> > > > > > > > > >> >> > > not. This will harm the correctness of Flink jobs
> > in
> > > > some
> > > > > > use
> > > > > > > > cases.
> > > > > > > > > >> >> > > For example, in dim table join cases, recordA
> could
> > > be
> > > > a
> > > > > > > delete
> > > > > > > > > >> >> > > operation that involves state access, while
> recordB
> > > > could
> > > > > > be
> > > > > > > > an insert
> > > > > > > > > >> >> > > operation that needs to visit external storage
> > > without
> > > > > > state
> > > > > > > > access.
> > > > > > > > > >> >> > > If recordB's output precedes recordA's, then an
> > entry
> > > > > that
> > > > > > is
> > > > > > > > supposed
> > > > > > > > > >> >> > > to finally exist with recordB's value in the sink
> > > table
> > > > > > might
> > > > > > > > actually
> > > > > > > > > >> >> > > be deleted according to recordA's command. This
> > > > situation
> > > > > > > > should be
> > > > > > > > > >> >> > > avoided and the order of same-key records should
> be
> > > > > > strictly
> > > > > > > > > >> >> > > preserved.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 2. The FLIP says that StateRequests submitted by
> > > > > Callbacks
> > > > > > > > will not
> > > > > > > > > >> >> > > invoke further yield() methods. Given that
> yield()
> > is
> > > > > used
> > > > > > > > when there
> > > > > > > > > >> >> > > is "too much" in-flight data, does it mean
> > > > StateRequests
> > > > > > > > submitted by
> > > > > > > > > >> >> > > Callbacks will never be "too much"? What if the
> > total
> > > > > > number
> > > > > > > of
> > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink
> > operator's
> > > > > > memory
> > > > > > > > space?
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP provided
> > an
> > > > > > > > out-of-order
> > > > > > > > > >> >> > > execution mode apart from the default
> > > strictly-ordered
> > > > > > mode,
> > > > > > > > which can
> > > > > > > > > >> >> > > optimize performance by allowing more concurrent
> > > > > > executions.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order execution
> > > mode,
> > > > > > along
> > > > > > > > with the
> > > > > > > > > >> >> > > epoch mechanism, would bring more complexity to
> the
> > > > > > execution
> > > > > > > > model
> > > > > > > > > >> >> > > than the performance improvement it promises.
> Could
> > > we
> > > > > add
> > > > > > > some
> > > > > > > > > >> >> > > benchmark results proving the benefit of this
> mode?
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API
> section
> > > > > > > describing
> > > > > > > > how
> > > > > > > > > >> >> > > users or developers can switch between these two
> > > > > execution
> > > > > > > > modes.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint
> > mentioned
> > > > in
> > > > > > this
> > > > > > > > FLIP,
> > > > > > > > > >> >> > > there are also more other events that might
> appear
> > in
> > > > the
> > > > > > > > stream of
> > > > > > > > > >> >> > > data records. It might be better to generalize
> the
> > > > > > execution
> > > > > > > > mode
> > > > > > > > > >> >> > > mechanism to handle all possible events.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 4. It might be better to treat callback-handling
> > as a
> > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid
> > the
> > > > > > overhead
> > > > > > > > of
> > > > > > > > > >> >> > > repeatedly creating Mail objects.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 5. Could this FLIP provide the current default
> > values
> > > > for
> > > > > > > > things like
> > > > > > > > > >> >> > > active buffer size thresholds and timeouts? These
> > > could
> > > > > > help
> > > > > > > > with
> > > > > > > > > >> >> > > memory consumption and latency analysis.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of a
> > record
> > > in
> > > > > its
> > > > > > > > > >> >> > > RecordContext? It seems not used.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap or
> > > > RocksDB",
> > > > > > the
> > > > > > > > link
> > > > > > > > > >> >> > > points to a document in flink-1.15. It might be
> > > better
> > > > to
> > > > > > > > verify the
> > > > > > > > > >> >> > > referenced content is still valid in the latest
> > Flink
> > > > and
> > > > > > > > update the
> > > > > > > > > >> >> > > link accordingly. Same for other references if
> any.
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > Best,
> > > > > > > > > >> >> > > Yunfeng Zhou
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> > > > > > > > yuanmei.w...@gmail.com> wrote:
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > Hi Devs,
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan,
> > > > Jinzhong
> > > > > > Li,
> > > > > > > > Hangxiang
> > > > > > > > > >> >> > Yu,
> > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a
> > > > > discussion
> > > > > > > > about
> > > > > > > > > >> >> > > introducing
> > > > > > > > > >> >> > > > Disaggregated State Storage and Management in
> > Flink
> > > > > 2.0.
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > The past decade has witnessed a dramatic shift
> in
> > > > > Flink's
> > > > > > > > deployment
> > > > > > > > > >> >> > > mode,
> > > > > > > > > >> >> > > > workload patterns, and hardware improvements.
> > We've
> > > > > moved
> > > > > > > > from the
> > > > > > > > > >> >> > > > map-reduce era where workers are
> > > computation-storage
> > > > > > > tightly
> > > > > > > > coupled
> > > > > > > > > >> >> > > nodes
> > > > > > > > > >> >> > > > to a cloud-native world where containerized
> > > > deployments
> > > > > > on
> > > > > > > > Kubernetes
> > > > > > > > > >> >> > > > become standard. To enable Flink's Cloud-Native
> > > > future,
> > > > > > we
> > > > > > > > introduce
> > > > > > > > > >> >> > > > Disaggregated State Storage and Management that
> > > uses
> > > > > DFS
> > > > > > as
> > > > > > > > primary
> > > > > > > > > >> >> > > storage
> > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0
> > Roadmap.
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > Design Details can be found in FLIP-423[1].
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > This new architecture is aimed to solve the
> > > following
> > > > > > > > challenges
> > > > > > > > > >> >> > brought
> > > > > > > > > >> >> > > in
> > > > > > > > > >> >> > > > the cloud-native era for Flink.
> > > > > > > > > >> >> > > > 1. Local Disk Constraints in containerization
> > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by compaction in
> > the
> > > > > > current
> > > > > > > > state model
> > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large states
> > > > (hundreds
> > > > > of
> > > > > > > > Terabytes)
> > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native way
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > More specifically, we want to reach a consensus
> > on
> > > > the
> > > > > > > > following issues
> > > > > > > > > >> >> > > in
> > > > > > > > > >> >> > > > this discussion:
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > 1. Overall design
> > > > > > > > > >> >> > > > 2. Proposed Changes
> > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline
> > > > version
> > > > > > > > using DFS as
> > > > > > > > > >> >> > > > primary storage and complete core
> > functionalities,
> > > > > > > including:
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]:
> > Introduce
> > > > new
> > > > > > APIs
> > > > > > > > for
> > > > > > > > > >> >> > > > asynchronous state access.
> > > > > > > > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]:
> > > > > Implement a
> > > > > > > > non-blocking
> > > > > > > > > >> >> > > > execution model leveraging the asynchronous
> APIs
> > > > > > introduced
> > > > > > > > in
> > > > > > > > > >> >> > FLIP-424.
> > > > > > > > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]:
> > > Enable
> > > > > > > > retrieval of
> > > > > > > > > >> >> > remote
> > > > > > > > > >> >> > > > state data in batches to avoid unnecessary
> > > round-trip
> > > > > > costs
> > > > > > > > for remote
> > > > > > > > > >> >> > > > access
> > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]:
> > > Introduce
> > > > > the
> > > > > > > > initial
> > > > > > > > > >> >> > version
> > > > > > > > > >> >> > > of
> > > > > > > > > >> >> > > > the ForSt disaggregated state store.
> > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration
> > > (FLIP-428)[6]:
> > > > > > > > Integrate
> > > > > > > > > >> >> > > > checkpointing mechanisms with the disaggregated
> > > state
> > > > > > store
> > > > > > > > for fault
> > > > > > > > > >> >> > > > tolerance and fast rescaling.
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > We will vote on each FLIP in separate threads
> to
> > > make
> > > > > > sure
> > > > > > > > each FLIP
> > > > > > > > > >> >> > > > reaches a consensus. But we want to keep the
> > > > discussion
> > > > > > > > within a
> > > > > > > > > >> >> > focused
> > > > > > > > > >> >> > > > thread (this thread) for easier tracking of
> > > contexts
> > > > to
> > > > > > > avoid
> > > > > > > > > >> >> > duplicated
> > > > > > > > > >> >> > > > questions/discussions and also to think of the
> > > > > > > > problem/solution in a
> > > > > > > > > >> >> > full
> > > > > > > > > >> >> > > > picture.
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > Looking forward to your feedback
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > Best,
> > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and
> > Feng
> > > > > > > > > >> >> > > >
> > > > > > > > > >> >> > > > [1]
> https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > > > > > >> >> > > > [2]
> https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > > > > > >> >> > > > [3]
> https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > > > > > >> >> > > > [4]
> https://cwiki.apache.org/confluence/x/TYp3EQ
> > > > > > > > > >> >> > > > [5]
> https://cwiki.apache.org/confluence/x/T4p3EQ
> > > > > > > > > >> >> > > > [6]
> https://cwiki.apache.org/confluence/x/UYp3EQ
> > > > > > > > > >> >> > >
> > > > > > > > > >> >> >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to