Hi Lorenzo,

Thanks for your comments!

I think you got my question, and I did not realize that is not even allowed
> to modify some externally scoped variable in a lambda.
> I guess the point is that it is possible, but the user would really need
> to be willing to do it and "shoot him/herself in the foot".
>
Well, I think what Yanfei means is you cannot capture local variables which
are not final or effectively final. It is not allowed in Java and the
compiler as well as the IDE will report this error.
Users can use some final containers to avoid this, with the lambda
capturing the final container and modifying the internal value of it. e.g.

```
final AtomicInteger x = new AtomicInteger(0);

void processElement(...) {
   state.value().then( val -> { x.addAndGet(val); } );
   ...
   out.collect(x.get());
}
```

For example, If the user now would not be sure that elements end up being
> in correct windows, I am afraid this would somewhat simply hinder the
> watermark concept as a whole. What do you think?
>
Well, I'm afraid there is no problem like this. Typically, the element will
be assigned to a window based on its event time, and they will only
manipulate the state associated with the related window. So as long as we
ensure that the window fires after corresponding elements processing, the
behavior is correct. Whether the elements for the latter window are
processed before or after former window fires  does not matter. Well this
is true for most cases, except for the scenarios where the state is not
divided into slices for different windows, while we haven't seen a concrete
use case for it. That's where the discussion between Xintong and me comes
from[1].


Hope these answer your questions.

[1] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz

Best,
Zakelly


On Fri, Mar 22, 2024 at 11:11 PM <lorenzo.affe...@ververica.com.invalid>
wrote:

> Thank you Yanfei for addressing all the questions!
>
> > I'm not sure if I understand your question. In Java, this
> case(modifying the local local variable) is not allowed[1], but there
> are ways to get around the limitation of lambda.
> In this case, the modification to x may be concurrent, which needs to
> be handled carefully.
>
> I think you got my question, and I did not realize that is not even
> allowed to modify some externally scoped variable in a lambda.
> I guess the point is that it is possible, but the user would really need
> to be willing to do it and "shoot him/herself in the foot".
>
> > an implicit fact in sync
> API is that "event timer fire" would execute before "the subsequent
> records of watermark", but in out-of-order mode(async API), the
> execution order between them is not guaranteed
>
> Got it, what I don't get exactly is what type of inconsistency/issue the
> user could face.
> For example, If the user now would not be sure that elements end up being
> in correct windows, I am afraid this would somewhat simply hinder the
> watermark concept as a whole. What do you think?
>
> Thank you.
>
> On Mar 21, 2024 at 14:27 +0100, Yanfei Lei <fredia...@gmail.com>, wrote:
> > Thanks for your reading and valuable comments!
> >
> > > 1) About locking VS reference counting: I would like to clear out
> which mechanism prevents what:
> > The `KeyAccountingUnit` implements locking behavior on keys and
> > ensures 2 state requests on the same key happen in order.
> > Double-locking the same key does not result in deadlocks (thanks to
> > the `previous == record` condition in your pseudo-code), so, the same
> > callback chain can update/read multiple times the same piece of state.
> > On the other side we have the reference counting mechanism that is
> > used to understand whether a record has been fully processed, i.e.,
> > all state invocations have been carried out.
> > Here is the question: am I correct if we say that key accounting is
> > needed for out-of-order while reference counting is needed for
> > checkpointing and watermarking?
> >
> >
> > Regarding the "deadlock" of `KeyAccountingUnit`: good catch, we will
> > emphasize this in FLIP, the KeyAccountingUnitis reentrant, so the
> > state requests of the same record can update/read multiple times
> > without deadlock.
> >
> > Regarding the question: records, checkpoint barriers and watermarks
> > can be regarded as inputs, this FLIP discusses the *order* between all
> > inputs, in simple terms, the inputs of the same key that arrive first
> > need to be executed first.
> >
> > And the `KeyAccountingUnit` and reference counting work together to
> > preserve the order, when the reference counting mechanism recognizes a
> > record has been fully processed, the record will be removed from the
> > `KeyAccountingUnit`. The checkpoint or watermark would start util all
> > the reference counting of arrived inputs reach zero.
> >
> >
> > > 2) Number of mails:
> > Do you end up having two mails?
> >
> >
> > Yes, there are two mails in this case.
> >
> > > 3) Would this change something on the consistency guarantees provided?
> > I guess not, as, the lock is held in any case until the value on the
> > state hasn't been updated.
> > Could lead to any inconsistency (most probably the state would be
> updated to 0).
> >
> >
> > Yes, the results of the two cases you mentioned are as you analyzed.
> > The result of the first case is 1, and the result of the second case
> > is 0.
> > No matter which case it is, the next `processElement` with the same
> > key will be executed after the code in this `processElement` is
> > completely executed.
> >
> > Therefore it wouldn't lead to inconsistency.
> >
> > > 4) On the local variables/attributes:
> >
> > I'm not sure if I understand your question. In Java, this
> > case(modifying the local local variable) is not allowed[1], but there
> > are ways to get around the limitation of lambda.
> > In this case, the modification to x may be concurrent, which needs to
> > be handled carefully.
> >
> > 5) On watermarks:
> > It seems that, in order to achieve a good throughput, out-of-order
> > mode should be used.
> > In the FLIP I could not understand well how many things could go wrong
> > if that one is used.
> > Could you please clarify that?
> >
> > A typical example is the order between "event timer fire" and "the
> > subsequent records of watermark".
> > Although the semantics of watermarks do not define the sequence
> > between a watermark and subsequent records, an implicit fact in sync
> > API is that "event timer fire" would execute before "the subsequent
> > records of watermark", but in out-of-order mode(async API), the
> > execution order between them is not guaranteed.
> > There also are some related discussions in FLIP-423[2,3] proposed by
> > Yunfeng Zhou and Xintong Song.
> >
> > [1]
> https://stackoverflow.com/questions/30026824/modifying-local-variable-from-inside-lambda
> > [2] https://lists.apache.org/thread/djsnybs9whzrt137z3qmxdwn031o93gn
> > [3] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz
> >
> > <lorenzo.affe...@ververica.com> 于2024年3月21日周四 19:29写道:
> > >
> > > Thank you everybody for the questions and answers (especially Yanfei
> Lei), it was very instructive to go over the discussion.
> > > I am gonna add some questions on top of what happened and add some
> thoughts as well below.
> > >
> > > 1) About locking VS reference counting:
> > > I would like to clear out which mechanism prevents what:
> > > The `KeyAccountingUnit` implements locking behavior on keys and
> ensures 2 state requests on the same key happen in order. Double-locking
> the same key does not result in deadlocks (thanks to the `previous ==
> record` condition in your pseudo-code), so, the same callback chain can
> update/read multiple times the same piece of state.
> > > On the other side we have the reference counting mechanism that is
> used to understand whether a record has been fully processed, i.e., all
> state invocations have been carried out.
> > > Here is the question: am I correct if we say that key accounting is
> needed for out-of-order while reference counting is needed for
> checkpointing and watermarking?
> > >
> > > 2) Number of mails:
> > > To expand on what Jing Ge already asked, in the example code in the
> FLIP:
> > >
> > > ```
> > > state.value().then(
> > > val -> {
> > > return state.update(val + 1).then(
> > > empty -> {
> > > out.collect(val + 1);
> > > };
> > > }
> > > }
> > > ```
> > > Do you end up having two mails?:
> > >
> > > first wrapping `val -> {...}`
> > > second wrapping `empty -> {...}`
> > >
> > >
> > > Did I get it correctly?
> > >
> > > 3) On the guarantees provided by the async execution framework:
> > > Always referring to your example, say that when `val -> {...}` gets
> called the state value is 0.
> > > The callback will update to 1 and register another callback to collect
> while still holding the lock on that piece of state, and preventing
> somebody else to read the value during the entire process (implementing
> atomicity for a transaction).
> > > Now, say the code changes to:
> > >
> > > ```
> > > state.value().then(
> > > val -> {
> > > state.update(val + 1);
> > > out.collect(val + 1);
> > > }
> > > }
> > > ```
> > >
> > > Would this change something on the consistency guarantees provided?
> > > I guess not, as, the lock is held in any case until the value on the
> state hasn't been updated.
> > >
> > > This, instead (similar to your example):
> > >
> > > ```
> > > int x = 0;
> > >
> > > state.value().then( val -> { x = val + 1; } );
> > > state.update(x);
> > > out.collect(x);
> > > ```
> > >
> > > Could lead to any inconsistency (most probably the state would be
> updated to 0).
> > >
> > > 4) On the local variables/attributes:
> > > The last example above exemplifies one of my concerns: what about the
> values enclosed in the callbacks?
> > > That seems a bit counter-intuitive and brittle from a user perspective.
> > > In the example we have a function-level variable, but what about
> fields:
> > >
> > > ```
> > > int x = 0;
> > >
> > > void processElement(...) {
> > > state.value().then( val -> { x += val; } );
> > > ...
> > > out.collect(x);
> > > }
> > > ```
> > >
> > > What could happen here?
> > > Every callback would enclose the current value of the field?
> > > Don't know exactly where I am heading, but it seems quite
> complex/convoluted :)
> > >
> > > 5) On watermarks:
> > > It seems that, in order to achieve a good throughput, out-of-order
> mode should be used.
> > > In the FLIP I could not understand well how many things could go wrong
> if that one is used.
> > > Could you please clarify that?
> > >
> > > Thank you for your availability and your great work!
> > >
> > > On Mar 19, 2024 at 10:51 +0100, Yanfei Lei <fredia...@gmail.com>,
> wrote:
> > >
> > > Hi everyone,
> > >
> > > Thanks for your valuable discussion and feedback!
> > >
> > > Our discussions have been going on for a while and there have been no
> > > new comments for several days. So I would like to start a vote after
> > > 72 hours.
> > >
> > > Please let me know if you have any concerns, thanks!
> > >
> > > Yanfei Lei <fredia...@gmail.com> 于2024年3月13日周三 12:54写道:
> > >
> > >
> > > Hi Jing,
> > > Thanks for the reply and follow up.
> > >
> > > > > What is the benefit for users to build a chain of mails instead of
> just one mail(it is still async)?
> > >
> > >
> > > Just to make sure we're on the same page, I try to paraphrase your
> question:
> > > A `then()` call will be encapsulated as a callback mail. Your question
> > > is whether we can call then() as little as possible to reduce the
> > > overhead of encapsulating it into a mail.
> > >
> > > In general, whether to call `then()` depends on the user's data
> > > dependencies. The operations in a chain of `then()` are strictly
> > > ordered.
> > >
> > >
> > >
> > > The following is an example without data dependencies, if written in
> > > the form of a `then` chain:
> > > stateA.update(1).then(stateB.update(2).then(stateC.update(3)));
> > >
> > > The execution order is:
> > > ```
> > > stateA update 1 -> stateB update 2-> stateC update 3
> > > ```
> > >
> > > If written in the form without `then()` call, they will be placed in a
> > > "mail/mailboxDefaultAction", and each state request will still be
> > > executed asynchronously:
> > > ```
> > > stateA.update(1);
> > > stateB.update(2);
> > > stateC.update(3);
> > > ```
> > >
> > > The order in which they are executed is undefined and may be:
> > > ```
> > > - stateA update 1 -> stateB update 2-> stateC update 3
> > > - stateB update 2 -> stateC update 3-> stateA update 1
> > > - stateC update 3 -> stateA update 1-> stateB update 2
> > > ...
> > > ```
> > > And the final results are "stateA = 1, stateB = 2, stateC = 3". In
> > > this case, the two ways of writing are equivalent.
> > >
> > >
> > >
> > > If there are data dependencies, for example:
> > > ```
> > > stateA.update(1).then(stateA.update(2))
> > > ```
> > >
> > > Then the execution order is:
> > > ```
> > > stateA update 1 -> stateA update 2
> > > ```
> > >
> > > If written in the form without `then()` call:
> > > ```
> > > stateA.update(1);
> > > stateA.update(2);
> > > ```
> > >
> > > The order in which they are executed is undefined and may be:
> > > ```
> > > - stateA update 1 -> stateA update 2
> > > - stateA update 2-> stateA update 1
> > > ```
> > > The final result may be "stateA = 1" *OR* "stateA = 2". In this case,
> > > the way without `then()` chain to limit the execution order, and the
> > > results may be wrong.
> > >
> > > In summary, how many mails are encapsulated depends on how the user
> > > writes the code, and how the user writes the code depends on their
> > > data dependencies. [1][2] may be helpful for asynchronous programming
> > > practice.
> > >
> > >
> > > > > I was wondering if exceptions in the mail chain would have an
> impact on the reference counting?
> > >
> > >
> > > We will catch exceptions that can be handled, they don't have impacts
> > > on the reference counting.
> > > For exceptions that cannot be handled, we will directly fail the job.
> > >
> > > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and
> unhappy paths, would make sense.
> > >
> > >
> > > Nice suggestions, we will add a UT to cover those cases.
> > >
> > >
> > > [1]
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
> > > [2] https://www.codingjunkie.net/completable-futures-part1/
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道:
> > >
> > > > >
> > > > > Hi Yanfei,
> > > > >
> > > > > Thanks for your clarification! Now I got a much clear picture and
> I am
> > > > > still trying to understand your thoughts for some of those
> questions:
> > > > >
> > > > >
> > >
> > > > > > > How many mails are encapsulated depends on how the user writes
> the
> > > > > > > code. The statements in a `then()` will be wrapped into a mail.
> > > > > > > StateFuture is a restricted version of CompletableFuture,
> their basic
> > > > > > > semantics are consistent.
> > > > > > >
> > >
> > > > >
> > > > > Conceptually, users can write a chain of many async calls, i.e.
> many then()
> > > > > calls. And all these calls for Record A must be executed in order,
> while
> > > > > Record B should stay at the Blocking buffer. What is the benefit
> for users
> > > > > to build a chain of mails instead of just one mail(it is still
> async)? Is
> > > > > there any best practices or guidelines to teach/tell users when
> and how
> > > > > many async calls in a chain could/should be built?
> > > > >
> > >
> > > > > > > The challenge arises in determining when all the processing
> logic
> > >
> > > > > associated with Record A is fully executed. To address this, we
> have
> > > > > adopted a reference counting mechanism that tracks ongoing
> operations
> > > > > (either processing input or executing a callback) related to a
> single
> > > > > record.
> > > > >
> > >
> > > > > > > We describe this in the "Error handling"[2] section. This FLIP
> also
> > > > > > > adopts the design from FLIP-368, ensuring that all state
> interfaces
> > > > > > > throw unchecked exceptions and, consequently, do not declare
> any
> > > > > > > exceptions in their signatures. In cases where an exception
> occurs
> > > > > > > while accessing the state, the job should fail.
> > > > > > >
> > >
> > > > >
> > > > > My question was not about how exceptions will be defined. I am not
> sure how
> > > > > unchecked exceptions handling will be implemented. I was wondering
> if
> > > > > exceptions in the mail chain would have an impact on the reference
> > > > > counting? E.g. in Fig 5, if an exception happened in the value(),
> > > > > update(int), or function within then(), any -1 counting might be
> missed?
> > > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and
> unhappy paths,
> > > > > would make sense.
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com>
> wrote:
> > > > >
> > >
> > > > > > > Hi Jing,
> > > > > > >
> > > > > > > Thanks for your thoughtful feedback!
> > > > > > >
> > >
> > > > > > > > > does it mean that there will be three mails for Read,
> Update, and Output
> > >
> > > > > > > ?
> > > > > > >
> > > > > > > With this example, there are two mails. The Read is processed
> by
> > > > > > > `mailboxDefaultAction`[1], and the Update and Output are
> encapsulated
> > > > > > > as mail.
> > > > > > >
> > >
> > > > > > > > > does it make sense to encapsulate one mail instead of 3
> mails with more
> > >
> > > > > > > overhead?
> > > > > > >
> > > > > > >
> > >
> > > > >
> > >
> > > > > > > How many mails are encapsulated depends on how the user writes
> the
> > > > > > > code. The statements in a `then()` will be wrapped into a mail.
> > > > > > > StateFuture is a restricted version of CompletableFuture,
> their basic
> > > > > > > semantics are consistent.
> > > > > > >
> > > > > > >
> > >
> > > > >
> > >
> > > > > > > > > Would you like to add more description for cases when
> exceptions
> > >
> > > > > > > happened? E.g. when reading or/and updating State throws
> IOExceptions.
> > > > > > >
> > > > > > >
> > >
> > > > >
> > >
> > > > > > > We describe this in the "Error handling"[2] section. This FLIP
> also
> > > > > > > adopts the design from FLIP-368, ensuring that all state
> interfaces
> > > > > > > throw unchecked exceptions and, consequently, do not declare
> any
> > > > > > > exceptions in their signatures. In cases where an exception
> occurs
> > > > > > > while accessing the state, the job should fail.
> > > > > > >
> > >
> > > > >
> > > > >
> > > > >
> > >
> > > > > > > > > Is it correct to understand that AEC is stateless?
> > >
> > > > > > >
> > > > > > > Great perspective, yes, it can be understood that way.
> > > > > > > AEC is a task-level component. When the job fails or is
> restarted, the
> > > > > > > runtime status in AEC will be reset.
> > > > > > > In fact, we have considered taking a snapshot of the status in
> AEC and
> > > > > > > storing it in a checkpoint like "unaligned checkpoint", but
> since
> > > > > > > callback cannot be serialized, this idea is not feasible for
> the time
> > > > > > > being.
> > > > > > >
> > >
> > > > > > > > > would you like to add Pseudo-code for the
> inFilghtReocordNum decrement
> > >
> > > > > > > to help us understand the logic better?
> > > > > > >
> > > > > > > This part of the code is a bit scattered, we will try to
> abstract a
> > > > > > > pseudo-code. You can first refer to the RecordContext-related
> code [3]
> > > > > > > in the PoC to understand it.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> > > > > > > [2]
> > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> > > > > > > [3]
> > > > > > >
> https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yanfei
> > > > > > >
> > > > > > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道:
> > >
> > > > > > > > >
> > > > > > > > > Hi Yanfei,
> > > > > > > > >
> > > > > > > > > Thanks for your proposal! The FLIP contains a lot of great
> new ideas. I'd
> > > > > > > > > like to ask some questions to make sure we are on the same
> page.
> > > > > > > > >
> > >
> > > > > > > > > > > For the asynchronous interface, Record A should run
> with Read, Update
> > >
> > > > > > > and
> > >
> > > > > > > > > Output, while Record B should stay at the Blocking buffer.
> > > > > > > > >
> > > > > > > > > 1. With this example, does it mean that there will be
> three mails for
> > >
> > > > > > > Read,
> > >
> > > > > > > > > Update, and Output ?
> > > > > > > > > 2. If yes, since the Read, Update, and Output have to be
> executed before
> > > > > > > > > Record B, does it make sense to encapsulate one mail
> instead of 3 mails
> > > > > > > > > with more overhead? There must be some thoughts behind the
> design. Look
> > > > > > > > > forward to it.
> > > > > > > > >
> > >
> > > > > > > > > > > The challenge arises in determining when all the
> processing logic
> > >
> > > > > > > > > associated with Record A is fully executed. To address
> this, we have
> > > > > > > > > adopted a reference counting mechanism that tracks ongoing
> operations
> > > > > > > > > (either processing input or executing a callback) related
> to a single
> > > > > > > > > record.
> > > > > > > > >
> > > > > > > > > The design reminds me of the JVM reference counting for
> GC. Would you
> > >
> > > > > > > like
> > >
> > > > > > > > > to add more description for cases when exceptions
> happened? E.g. when
> > > > > > > > > reading or/and updating State throws IOExceptions.
> > > > > > > > >
> > >
> > > > > > > > > > > In more detail, AEC uses a inFilghtReocordNum variable
> to trace the
> > >
> > > > > > > > > current number of records in progress. Every time the AEC
> receives a new
> > > > > > > > > record, the inFilghtReocordNum increases by 1; when all
> processing and
> > > > > > > > > callback for this record have completed, the
> inFilghtReocordNum
> > >
> > > > > > > decreases
> > >
> > > > > > > > > by 1. When processing one checkpoint mail, the current
> task thread will
> > > > > > > > > give up the time slice through the yield() method of the
> mailbox
> > >
> > > > > > > executor,
> > >
> > > > > > > > > so that the ongoing state request’s callback and the
> blocking state
> > > > > > > > > requests will be drained first until inFlightRecordNum
> reduces to 0.
> > > > > > > > >
> > > > > > > > > 1. Speaking of draining, is it correct to understand that
> AEC is
> > >
> > > > > > > stateless?
> > >
> > > > > > > > > E.g. AEC could be easily scaled out if it became a
> bottleneck.
> > > > > > > > > 2. There are Pseudo-code for the inFilghtReocordNum
> increment, would you
> > > > > > > > > like to add Pseudo-code for the inFilghtReocordNum
> decrement to help us
> > > > > > > > > understand the logic better?
> > > > > > > > >
> > > > > > > > > The FLIP shows overall a great design! +1 for it! Looking
> forward to your
> > > > > > > > > thoughts, thanks!
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Jing
> > > > > > > > >
> > > > > > > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei <
> fredia...@gmail.com> wrote:
> > > > > > > > >
> > >
> > > > > > > > > > > Hi devs,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start a discussion on FLIP-425:
> Asynchronous Execution
> > > > > > > > > > > Model[1], which is a sub-FLIP of FLIP-423:
> Disaggregated State Storage
> > > > > > > > > > > and Management[2].
> > > > > > > > > > >
> > > > > > > > > > > FLIP-425 introduces a non-blocking execution model
> leveraging the
> > > > > > > > > > > asynchronous APIs introduced in FLIP-424[3].
> > > > > > > > > > > For the whole story please read the FLIP-423[2], and
> this thread is
> > > > > > > > > > > aimed to discuss the details of "FLIP-425:
> Asynchronous Execution
> > > > > > > > > > > Model".
> > > > > > > > > > >
> > > > > > > > > > > Regarding the details of this FLIP, there have been
> some discussions
> > > > > > > > > > > here[4], mainly focusing on framework overhead
> profiling, watermark
> > > > > > > > > > > processing, etc. Please see link[4] for the context.
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to hearing from you!
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > > > > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > > > > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > > > > > > > [4]
> https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Yanfei
> > > > > > > > > > >
> > >
> > > > > > >
> > >
> > >
> > >
> > >
> > > --
> > > Best,
> > > Yanfei
> > >
> > >
> > >
> > >
> > > --
> > > Best,
> > > Yanfei
> >
> >
> >
> > --
> > Best,
> > Yanfei
>

Reply via email to