Thanks for addressing my comments, Dong.

The expected behavior of checkpointing and failover depends on whether
> there is any operator currently running in the job with all its inputs'
> isBacklog=true. If there exists such an operator and
> interval-during-backlog = 0, then checkpoint will be disabled and the
> operator will have to failover in a way similar to batch mode.


This makes sense to me. Shall we also put this into the FLIP. Or maybe you
already did that and I overlooked it? The current description in "4)
Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me.
It says "At the point when isBacklog switches to false, source operator
...", which sounds like upon any source operator switching to isBacklog =
false.

I am not sure what is the concern with having `flink-streaming-java` depend
> on `flink-runtime`. Can you clarify the exact concern?
>

The concern here is that an API module should not depend on a runtime
module. Currently, we have the "user codes -> flink-streaming-java ->
flink-runtime" dependency chain, which makes binary compatibility
impossible because any runtime changes can break the compatibility with a
user jar (which bundles flink-streaming-java) compiled for an older
version. Ideally, we want the runtime module to depend on the API module,
rather than the other way around. This is one of the issues we are trying
to resolve with the programmatic API refactor. However, the way we are
trying to resolve it is to introduce another API module and gradually
replace the current DataStream API / flink-streaming-java, which means
flink-streaming-java will stay depending on flink-runtime for a while
anyway. So the concern here is minor, only about we might need more effort
when reworking this with the new API.

The rest of your replies make sense to me.

Best,

Xintong



On Fri, Sep 15, 2023 at 10:05 PM Dong Lin <lindon...@gmail.com> wrote:

> Hi Xintong,
>
> Thanks for your comments! Please see my reply inline.
>
> On Thu, Sep 14, 2023 at 4:58 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
> > Sorry to join the discussion late.
> >
> > Overall, I think it's a good idea to support dynamically switching the
> > operator algorithms between Streaming (optimized towards low latency +
> > checkpointing supports) and Batch (optimized towards throughput). This is
> > indeed a big and complex topic, and I really appreciate the previous
> > discussions that narrow the scope of this FLIP down to only considering
> > switching from Batch to Streaming as a first step.
> >
> > I have several questions.
> >
> > 1. The FLIP discusses various behaviors under 4 scenarios: streaming
> mode,
> > batch mode, mixed mode with checkpoint interval > 0, mixed mode with
> > checkpoint interval = 0. IIUC, this is because many batch optimizations
> > cannot be supported together with checkpointing. This justifies that in
> > mixed mode with interval > 0, most behaviors are the same as in streaming
> > mode. However, mixed mode with checkpoint interval = 0 does not always
> > necessarily mean we should apply such optimization. It is possible that
> in
> > some cases (likely with small data amounts) the cost of such
> optimizations
> > are higher than the benefit. Therefore, I'd suggest decoupling the
> concept
> > of applying these optimizations (i.e., the batch execution phase in the
> > mixed mode) from whether checkpointing is enabled or not. In particular,
> > I'd suggest removing the scenario "mixed mode with
> > e.c.interval-during-backlog > 0", changing the scenario "mixed mode with
> > e.c.interval-during-backlog = 0" to simply "mixed mode", and say that can
> > have different strategies for deciding whether to enable the mixed mode
> and
> > as the first step the strategy is to enable it when
> > e.c.interval-during-backlog = 0.
> >
>
> Thanks for the detailed explanation!
>
> I have updated the "Behavior changes when switching from batch mode to
> stream mode" section with the following changes.
>
> 1) Remove the description of "mixed mode with interval-during-backlog > 0"
> and add the statement saying that "after this FLIP, the behavior of Flink
> runtime with execution.runtime-mode = streaming AND
> execution.checkpointing.interval-during-backlog > 0, will be same as the
> stream mode prior to this FLIP"
>
> 2) Add the statement saying that "Mixed mode refers to the behavior of
> Flink runtime after this FLIP with execution.runtime-mode = streaming AND
> execution.checkpointing.interval-during-backlog = 0".
>
> 3) Add the statement saying that "It is possible for mixed mode to be
> slower than stream mode, particularly when there is only small amount of
> input records and the overhead of buffering/sorting inputs out-weight its
> benefit. This is similar to how the merge join might be slower than hash
> join. This FLIP focuses on optimizing the Flink throughput when there is a
> high number of input records. In the future, we might introduce more
> strategies to turn on mix mode in a smart way to avoid performance
> regression".
>
> Would this address your concern?
>
>
> >
> > 2. According to the FLIP, before isBacklog = false, the timer service
> only
> > keeps timers for the current key. It also says upon the end of each key,
> it
> > fires timers of the key up to the last watermark. IIUC, that means not
> all
> > timers are guaranteed to be fired. It is possible that some timers are
> left
> > to be triggered after isBacklog switching to false. If the timer service
> > only keeps timers for the current key, those not-fired timers may get
> lost
> > when switching to a new key.
> >
>
> Thanks for catching this. You are right that all timers should be fired as
> long as the corresponding firing condition (either processing-time or
> event-time) is satisfied.
>
> I have updated the "Timer Service" part of the "Behavior changes when
> switching from batch mode to stream mode" section accordingly. Can you see
> if it addresses your concern?
>
>
> >
> > 3. Is it possible that some sources / operators in the job switch to
> > isBacklog = false, while others are still isBacklog = true? In that case,
> > what is the expected behavior for checkpointing and failover?
> >
>
> Yes, it is possible. And in this case, Flink runtime will handle this
> operator as if all the operator's inputs have isBacklog=false. In
> particular, Flink runtime will not automatically sort inputs of this
> operator.
>
> I added the following statement in the FLIP to clarify the behavior: "For
> an operator with 2+ inputs, where some inputs have isBacklog=true and some
> other inputs have isBacklog=false, Flink runtime will handle this operator
> as if all its inputs have isBacklog=false".
>
> The expected behavior of checkpointing and failover depends on whether
> there is any operator currently running in the job with all its inputs'
> isBacklog=true. If there exists such an operator
> and interval-during-backlog = 0, then checkpoint will be disabled and the
> operator will have to failover in a way similar to batch mode.
>
>
>
> >
> > 4. Do we require RecordAttributes to be properly handled by all
> operators?
> > Or do we consider it as hints that operators may benefit from looking
> into
> > it but should not run into any problems ignoring it? I'm asking because,
> if
> > they are required to be properly handled, we probably need a way to
> enforce
> > operators to deal with it. `processRecordAttributes(RecordAttributes)`
> > might not be a good fit because we don't know whether the operator has
> > looked into all necessary fields of `RecordAttributes`.
> >
>
> As of this FLIP, we would not require RecordAttributes to be handled any
> operator in order to achieve correctness. So it is more like a hint. More
> specifically, the isBacklog attribute provides a hint for an operator to
> optionally delay the processing of its inputs if doing so can improve its
> throughput.
>
> I think it would be useful to avoid requiring operators to explicitly
> handling attributes contained in RecordAttributes. This is because we want
> the features added in this FLIP (and future FLIPs) to be backward
> compatible without breaking the correctness of existing jobs.
>
> Suppose we really need to add a record attribute that should be explicitly
> handled by every operator, I believe we can always find a way to enforce
> this requirement (e.g. fail job compilation with proper error) in the
> future. For example, we can add a method such as handleXXXRecodAttribute()
> in the operator interface without default implementation.
>
>
> >
> > 5. I wonder if there's any strong reasons behind choosing `RuntimeEvent`
> > over `StreamElement` for `RecordAttributes` to extend? My concern is
> that,
> > the current approach introduces one more dependency from
> > `flink-streaming-java` (operators that uses `RecordAttributes`) to
> > `flink-runtime` (where `RuntimeEvent` comes from), which seems to be
> > unnecessary.
> >
>
> There is no strong reason to choose `RuntimeEvent` over `StreamElement`. I
> think the main (and minor) reason for doing so is the simplicity of
> implementation. For example, we don't need to add methods such
> as StreamElement#isRecordAttributes and StreamElement#asRecordAttributes.
>
> I am not sure what is the concern with having `flink-streaming-java` depend
> on `flink-runtime`. Can you clarify the exact concern?
>
> In any case, I don't have a strong preference between `RuntimeEvent` over
> `StreamElement`. We can update the FLIP to use `StreamElement` as long as
> there is a well-defined non-trivial reason for making this choice.
>
>
> > 6. The FLIP says it leverages state backend optimizations introduced in
> > FLIP-325. Just for clarification, does this mean this FLIP is depending
> on
> > FLIP-325, and probably should not be voted / accepted until FLIP-325 is
> > accepted?
> >
>
> Yes, the proposed implementation of this FLIP depends on FLIP-325. We can
> start voting thread for FLIP-327 after FLIP-325 is accepted. Maybe we can
> continue to discuss this FLIP in the mean time (before FLIP-325 is
> accepted).
>
> Thanks again for the very detailed and helpful review! Looking forward to
> your follow-up comments.
>
> Best,
> Dong
>
>
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, Sep 1, 2023 at 12:48 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > > Hi,
> > >
> > > some keywords in this triggered my attention, so sorry for late jumping
> > > in, but I'd like to comprehend the nature of the proposal.
> > >
> > > I'll try to summarize my understanding:
> > >
> > > The goal of the FLIP is to support automatic switching between
> streaming
> > > and batch processing, leveraging the fact that batch processing is more
> > > computationally effective. This makes perfect sense.
> > >
> > > Looking at the streaming vs. batch semantics, switching from streaming
> > > to batch means the following:
> > >
> > >   a) generally, watermarks are not propagated in batch, watermark moves
> > > from -inf to +inf in one step, at the end of batch input, this might
> > > (and probably will) skip many invocations of timers
> > >
> > >   b) grouping by key (and window) can be done efficiently, because it
> > > can be done by sort-group and ideally parallelized by window (with some
> > > caveats)
> > >
> > > The switch also has some conditions, namely:
> > >
> > >   i) batch mode does not do checkpoints, inputs must be accessible
> > > repeatedly (forever)
> > >
> > >   ii) due to failures in batch mode, inputs might be reprocessed and
> > > thus must be immutable or all sub-results computed in all branches of
> > > the computation (even possibly unaffected by the failure) have to be
> > > discarded and recomputed from scratch
> > >
> > > Obviously, in case of the switch from batch to streaming, the property
> > > a) has to be modified so the watermark does not move to +inf, but to
> > > min(streaming watermark). Giving these properties, it should be
> possible
> > > to exchange batch and streaming processing without any cooperation with
> > > the application logic itself. Is my understanding correct?
> > >
> > > If so, there is still one open question to efficiency, though. The
> > > streaming operator _might_ need sorting by timestamp (e.g. processing
> > > time-series data, or even sequential data). In that case simply
> > > switching streaming semantics to batch processing does not yield
> > > efficient processing, because the operator still needs to buffer and
> > > manually sort all the input data (batch data is always unordered). On
> > > the other hand, the batch runner already does sorting (for grouping by
> > > key), so adding additional sorting criterion is very cheap. In Apache
> > > Beam, we introduced a property of a stateful PTransform (DoFn) called
> > > @RequiresTimeSortedInput [1], which can then be implemented efficiently
> > > by batch engines.
> > >
> > > Does the FLIP somehow work with conditions i) and ii)? I can imagine
> for
> > > instance that if data is read from say Kafka, then if backlog gets
> > > sufficiently large, then even the batch processing can take substantial
> > > time and if it fails after long processing, some of the original data
> > > might be already rolled out from Kafka topic.
> > >
> > > In the FLIP there are some proposed changes to sources to emit metadata
> > > about if the records come from backlog. What is the driving line of
> > > thoughts why this is needed? In my point of view, streaming engines are
> > > _always_ processing backlog, the only question is "how delayed are the
> > > currently processed events after HEAD", or more specifically in this
> > > case "how many elements can we expect to process if the source would
> > > immediately stop receiving more data?". This should be configurable
> > > using simple option defining the difference between current
> > > processing-time (JM) and watermark of the source, or am I missing
> > > something?
> > >
> > > Thanks for clarification and all the best,
> > >
> > >   Jan
> > >
> > > [1]
> > >
> > >
> >
> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> > >
> > > On 8/31/23 13:17, Xuannan Su wrote:
> > > > Hi all,
> > > >
> > > > I would like to share some updates on FLIP-327. Dong and I have had a
> > > > series of discussions and have made several refinements to the FLIP.
> > > >
> > > > The major change to the FLIP is to allow the input of the one-input
> > > > operator to be automatically sorted during backlog processing. When
> > > > combined with the state backend optimization introduced in FLIP-325
> > [1],
> > > > all the keyed single-input operators can achieve similar performance
> as
> > > in
> > > > batch mode during backlog processing without any code change to the
> > > > operator. We also implemented a POC[2] and conducted benchmark[3]
> using
> > > the
> > > > KeyedStream#reduce operation. The benchmark results demonstrate the
> > > > performance gains that this FLIP can offer.
> > > >
> > > > I am looking forward to any comments or feedback you may have on this
> > > FLIP.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access
> > > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo
> > > > [3]
> > > >
> > >
> >
> https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java
> > > >
> > > >
> > > >
> > > >> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> wrote:
> > > >>
> > > >> Hi Piotr,
> > > >>
> > > >> Thanks for the explanation.
> > > >>
> > > >> To recap our offline discussion, there is a concern regarding the
> > > >> capability to dynamically switch between stream and batch modes.
> This
> > > >> concern is around unforeseen behaviors such as bugs or performance
> > > >> regressions, which we might not yet be aware of yet. The reason for
> > this
> > > >> concern is that this feature involves a fundamental impact on the
> > Flink
> > > >> runtime's behavior.
> > > >>
> > > >> Due to the above concern, I agree it is reasonable to annotate
> related
> > > > APIs
> > > >> as experimental. This step would provide us with the flexibility to
> > > modify
> > > >> these APIs if issues arise in the future. This annotation also
> serves
> > > as a
> > > >> note to users that this functionality might not perform well as
> > > expected.
> > > >>
> > > >> Though I believe that we can ensure the reliability of this feature
> > > > through
> > > >> good design and code reviews, comprehensive unit tests, and thorough
> > > >> integration testing, I agree that it is reasonable to be extra
> > cautious
> > > in
> > > >> this case. Also, it should be OK to delay making these APIs as
> > > >> non-experimental by 1-2 releases.
> > > >>
> > > >> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in
> these
> > > docs
> > > >> as experimental. Please let me know if you think any other API
> should
> > > also
> > > >> be marked as experimental.
> > > >>
> > > >> Thanks!
> > > >> Dong
> > > >>
> > > >> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Dong,
> > > >>>
> > > >>> Operators API is unfortunately also our public facing API and I
> mean
> > > the
> > > >>> APIs that we will add there should also be marked `@Experimental`
> > IMO.
> > > >>>
> > > >>> The config options should also be marked as experimental (both
> > > >>> annotated @Experimental and noted the same thing in the docs,
> > > >>> if @Experimental annotation is not automatically mentioned in the
> > > docs).
> > > >>>
> > > >>>> Alternatively, how about we add a doc for
> > > >>> checkpointing.interval-during-backlog explaining its impact/concern
> > as
> > > >>> discussed above?
> > > >>>
> > > >>> We should do this independently from marking the APIs/config
> options
> > as
> > > >>> `@Experimental`
> > > >>>
> > > >>> Best,
> > > >>> Piotrek
> > > >>>
> > > >>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > >>>
> > > >>>> Hi Piotr,
> > > >>>>
> > > >>>> Thanks for the reply!
> > > >>>>
> > > >>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski <
> > > piotr.nowoj...@gmail.com
> > > >>>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> Sorry for the long delay in responding!
> > > >>>>>
> > > >>>>>> Given that it is an optional feature that can be
> > > >>>>>> turned off by users, it might be OK to just let users try it out
> > and
> > > >>> we
> > > >>>>> can
> > > >>>>>> fix performance issues once we detect any of them. What do you
> > > think?
> > > >>>>> I think it's fine. It would be best to mark this feature as
> > > >>> experimental,
> > > >>>>> and
> > > >>>>> we say that the config keys or the default values might change in
> > the
> > > >>>>> future.
> > > >>>>>
> > > >>>> In general I agree we can mark APIs that determine "whether to
> > enable
> > > >>>> dynamic switching between stream/batch mode" as experimental.
> > > >>>>
> > > >>>> However, I am not sure we have such an API yet. The APIs added in
> > this
> > > >>> FLIP
> > > >>>> are intended to be used by operator developers rather than end
> > users.
> > > > End
> > > >>>> users can enable this capability by setting
> > > >>>> execution.checkpointing.interval-during-backlog = Long.MAX and
> uses
> > a
> > > >>>> source which might implicitly set backlog statu (e.g.
> HybridSource).
> > > So
> > > >>>> execution.checkpointing.interval-during-backlog is the only
> > > user-facing
> > > >>>> APIs that can always control whether this feature can be used.
> > > >>>>
> > > >>>> However, execution.checkpointing.interval-during-backlog itself is
> > not
> > > >>> tied
> > > >>>> to FLIP-327.
> > > >>>>
> > > >>>> Do you mean we should set checkpointing.interval-during-backlog as
> > > >>>> experimental? Alternatively, how about we add a doc for
> > > >>>> checkpointing.interval-during-backlog explaining its
> impact/concern
> > as
> > > >>>> discussed above?
> > > >>>>
> > > >>>> Best,
> > > >>>> Dong
> > > >>>>
> > > >>>>
> > > >>>>>> Maybe we can revisit the need for such a config when we
> > > >>>> introduce/discuss
> > > >>>>>> the capability to switch backlog from false to true in the
> future.
> > > >>> What
> > > >>>>> do
> > > >>>>>> you think?
> > > >>>>> Sure, we can do that.
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Piotrek
> > > >>>>>
> > > >>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com>
> > > napisał(a):
> > > >>>>>
> > > >>>>>> Hi Piotr,
> > > >>>>>>
> > > >>>>>> Thanks a lot for the explanation. Please see my reply inline.
> > > >>>>>>
> > > >>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski <
> > > >>>>> piotr.nowoj...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Dong,
> > > >>>>>>>
> > > >>>>>>> Thanks a lot for the answers. I can now only briefly answer
> your
> > > >>> last
> > > >>>>>>> email.
> > > >>>>>>>
> > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > >>> overhead.
> > > >>>>> IMO
> > > >>>>>> it
> > > >>>>>>>> is an orthogonal issue already existing in Flink. This is
> > > >>> because a
> > > >>>>>> Flink
> > > >>>>>>>> job running batch mode might also be slower than its
> throughput
> > > >>> in
> > > >>>>>> stream
> > > >>>>>>>> mode due to the same reason.
> > > >>>>>>> Yes, I know, but the thing that worries me is that previously
> > only
> > > >>> a
> > > >>>>> user
> > > >>>>>>> alone
> > > >>>>>>> could decide whether to use batch mode or streaming, and in
> > > >>> practice
> > > >>>>> one
> > > >>>>>>> user would rarely (if ever) use both for the same
> > > >>> problem/job/query.
> > > >>>> If
> > > >>>>>> his
> > > >>>>>>> intention was to eventually process live data, he was using
> > > >>> streaming
> > > >>>>>> even
> > > >>>>>>> if there was a large backlog at the start (apart of some very
> few
> > > >>>> very
> > > >>>>>>> power
> > > >>>>>>> users).
> > > >>>>>>>
> > > >>>>>>> With this change, we want to introduce a mode that would be
> > > >>> switching
> > > >>>>>> back
> > > >>>>>>> and forth between streaming and "batch in streaming"
> > automatically.
> > > >>>> So
> > > >>>>> a
> > > >>>>>>> potential performance regression would be much more visible and
> > > >>>> painful
> > > >>>>>>> at the same time. If batch query runs slower then it could,
> it's
> > > >>> kind
> > > >>>>> of
> > > >>>>>>> fine as
> > > >>>>>>> it will end at some point. If streaming query during large back
> > > >>>>> pressure
> > > >>>>>>> maybe
> > > >>>>>>> temporary load spike switches to batch processing, that's a
> > bigger
> > > >>>>> deal.
> > > >>>>>>> Especially if batch processing mode will not be able to
> actually
> > > >>> even
> > > >>>>>>> handle
> > > >>>>>>> the normal load, after the load spike. In that case, the job
> > could
> > > >>>>> never
> > > >>>>>>> recover
> > > >>>>>>> from the backpressure/backlog mode.
> > > >>>>>>>
> > > >>>>>> I understand you are concerned with the risk of performance
> > > >>> regression
> > > >>>>>> introduced due to switching to batch mode.
> > > >>>>>>
> > > >>>>>> After thinking about this more, I think this existing proposal
> > meets
> > > >>>> the
> > > >>>>>> minimum requirement of "not introducing regression for existing
> > > >>> jobs".
> > > >>>>> The
> > > >>>>>> reason is that even if batch mode can be slower than stream mode
> > for
> > > >>>> some
> > > >>>>>> operators in some cases, this is an optional feature that will
> > only
> > > >>> be
> > > >>>>>> enabled if a user explicitly overrides the newly introduced
> config
> > > to
> > > >>>>>> non-default values. Existing jobs that simply upgrade their
> Flink
> > > >>>> library
> > > >>>>>> version will not suffer any performance regression.
> > > >>>>>>
> > > >>>>>> More specifically, in order to switch to batch mode, users will
> > need
> > > >>> to
> > > >>>>>> explicitly set execution.checkpointing.interval-during-backlog
> to
> > 0.
> > > >>>> And
> > > >>>>>> users can always explicitly update
> > > >>>>>> execution.checkpointing.interval-during-backlog to turn off the
> > > batch
> > > >>>>> mode
> > > >>>>>> if that incurs any performance issue.
> > > >>>>>>
> > > >>>>>> As far as I can tell, for all practical workloads we see in
> > > >>> production
> > > >>>>>> jobs, batch mode is always faster (w.r.t. throughput) than
> stream
> > > >>> mode
> > > >>>>> when
> > > >>>>>> there is a high backlog of incoming records. Though it is still
> > > >>>>>> theoretically possible, it should be very rare (if any) for
> batch
> > > >>> mode
> > > >>>> to
> > > >>>>>> be slower in practice. Given that it is an optional feature that
> > can
> > > >>> be
> > > >>>>>> turned off by users, it might be OK to just let users try it out
> > and
> > > >>> we
> > > >>>>> can
> > > >>>>>> fix performance issues once we detect any of them. What do you
> > > think?
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>>> execution.backlog.use-full-batch-mode-on-start (default false)
> > > >>>>>>> ops sorry, it was supposed to be sth like:
> > > >>>>>>>
> > > >>>>>>> execution.backlog.use-batch-mode-only-on-start (default false)
> > > >>>>>>>
> > > >>>>>>> That option would disallow switching from streaming to batch.
> > Batch
> > > >>>>> mode
> > > >>>>>>> would be allowed only to get rid of the initial, present on
> > > >>> start-up
> > > >>>>>>> backlog.
> > > >>>>>>>
> > > >>>>>>> Would allow us to safely experiment with switching from
> streaming
> > > >>> to
> > > >>>>>> batch
> > > >>>>>>> and I would be actually more fine in enabling "using batch mode
> > on
> > > >>>>> start"
> > > >>>>>>> by default, until we gain confidence and feedback that
> switching
> > > >>>> back &
> > > >>>>>>> forth
> > > >>>>>>> is working as expected.
> > > >>>>>>>
> > > >>>>>> Now I understand what you are suggesting. I agree that it is
> > > >>> necessary
> > > >>>>> for
> > > >>>>>> users to be able to disallow switching from streaming to batch.
> > > >>>>>>
> > > >>>>>> I am not sure it is necessary to introduce an extra config just
> > for
> > > >>>> this
> > > >>>>>> purpose. The reason is that we don't have any strategy that
> > switches
> > > >>>>>> backlog status from false to true yet. And when we have such
> > > strategy
> > > >>>>> (e.g.
> > > >>>>>> FLIP-328) in the future, it is very likely that we will
> introduce
> > > >>> extra
> > > >>>>>> config(s) for users to explicitly turn on such a feature. That
> > means
> > > >>>> user
> > > >>>>>> should be able to turn off this feature even if we don't have
> > > >>> something
> > > >>>>>> like execution.backlog.use-batch-mode-only-on-start.
> > > >>>>>>
> > > >>>>>> Maybe we can revisit the need for such a config when we
> > > >>>> introduce/discuss
> > > >>>>>> the capability to switch backlog from false to true in the
> future.
> > > >>> What
> > > >>>>> do
> > > >>>>>> you think?
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>>>> Or we could limit the scope of this FLIP to only support
> > > >>> starting
> > > >>>>> with
> > > >>>>>>>>> batch mode and switching only once to
> > > >>>>>>>>> streaming, and design a follow up with switching back and
> > forth?
> > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP
> into
> > > >>>> two
> > > >>>>>>> FLIPs
> > > >>>>>>>> so that we can make incremental progress.
> > > >>>>>>> Great, let's do that. In a follow up FLIP we could restart the
> > > >>>>> discussion
> > > >>>>>>> about
> > > >>>>>>> switching back and forth.
> > > >>>>>>>
> > > >>>>>> Cool, I added the following statement to the motivation section.
> > > >>>>>>
> > > >>>>>> "NOTE: this FLIP focuses only on the capability to switch from
> > batch
> > > >>> to
> > > >>>>>> stream mode. If there is any extra API needed to support
> switching
> > > >>> from
> > > >>>>>> stream to batch mode, we will discuss them in a follow-up FLIP."
> > > >>>>>>
> > > >>>>>> I am looking forward to reading your follow-up thoughts!
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Dong
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> Piotrek
> > > >>>>>>>
> > > >>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com>
> > > >>> napisał(a):
> > > >>>>>>>> Hi Piotr,
> > > >>>>>>>>
> > > >>>>>>>> Thank you for the very detailed comments! Please see my reply
> > > >>>> inline.
> > > >>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski <
> > > >>>>>>> piotr.nowoj...@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Dong,
> > > >>>>>>>>>
> > > >>>>>>>>> I have a couple of follow up questions about switching back
> and
> > > >>>>> forth
> > > >>>>>>>>> between streaming and batching mode.
> > > >>>>>>>>> Especially around shuffle/watermark strategy, and keyed state
> > > >>>>>> backend.
> > > >>>>>>>>> First of all, it might not always be beneficial to switch
> into
> > > >>>> the
> > > >>>>>>> batch
> > > >>>>>>>>> modes:
> > > >>>>>>>>> - Shuffle strategy
> > > >>>>>>>>>     - Is sorting going to be purely in-memory? If not,
> > > >>> obviously
> > > >>>>>>> spilling
> > > >>>>>>>>> to disks might cause larger overheads
> > > >>>>>>>>>        compared to not sorting the records.
> > > >>>>>>>>>
> > > >>>>>>>> Sorting might require spilling data to disk depending on the
> > > >>> input
> > > >>>>>> size.
> > > >>>>>>>> The behavior of sorting w.r.t. memory/disk is expected to be
> > > >>>> exactly
> > > >>>>>> the
> > > >>>>>>>> same as the behavior of input sorting automatically performed
> by
> > > >>>>> Flink
> > > >>>>>>>> runtime in batch mode for keyed inputs.
> > > >>>>>>>>
> > > >>>>>>>> More specifically, ExternalSorter
> > > >>>>>>>> <
> > > >>>>>>>>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java
> > > >>>>>>>> is
> > > >>>>>>>> currently used to sort keyed inputs in batch mode. It is
> > > >>>>> automatically
> > > >>>>>>> used
> > > >>>>>>>> by Flink runtime in OneInputStreamTask (here
> > > >>>>>>>> <
> > > >>>>>>>>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114
> > > >>>>>>>>> )
> > > >>>>>>>> and in MultiInputSortingDataInput (here
> > > >>>>>>>> <
> > > >>>>>>>>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188
> > > >>>>>>>>> ).
> > > >>>>>>>> We plan to re-use the same code/mechanism to do sorting.
> > > >>>>>>>>
> > > >>>>>>>> It is possible that spilling to disks might cause larger
> > > >>> overhead.
> > > >>>>> IMO
> > > >>>>>> it
> > > >>>>>>>> is an orthogonal issue already existing in Flink. This is
> > > >>> because a
> > > >>>>>> Flink
> > > >>>>>>>> job running batch mode might also be slower than its
> throughput
> > > >>> in
> > > >>>>>> stream
> > > >>>>>>>> mode due to the same reason. However, even though it is
> possible
> > > >>> in
> > > >>>>>>> theory,
> > > >>>>>>>> I expect that in practice the throughput of using sorting +
> > > >>>>>>>> BatchExecutionKeyedStateBackend should be much higher than
> using
> > > >>>>> other
> > > >>>>>>>> keyed statebackends when the amount of data is large. As a
> > matter
> > > >>>> of
> > > >>>>>>> fact,
> > > >>>>>>>> we have not heard of complaints of such performance regression
> > > >>>> issues
> > > >>>>>> in
> > > >>>>>>>> batch mode.
> > > >>>>>>>>
> > > >>>>>>>> The primary goal of this FLIP is to allow the operator to run
> at
> > > >>>> the
> > > >>>>>> same
> > > >>>>>>>> throughput (in stream mode when there is backlog) as it can
> > > >>>> currently
> > > >>>>>> do
> > > >>>>>>> in
> > > >>>>>>>> batch mode. And this goal is not affected by the disk overhead
> > > >>>> issue
> > > >>>>>>>> mentioned above.
> > > >>>>>>>>
> > > >>>>>>>> I am thinking maybe we can treat it as an orthogonal
> performance
> > > >>>>>>>> optimization problem instead of solving this problem in this
> > > >>> FLIP?
> > > >>>>>>>>     - If it will be at least partially in-memory, does Flink
> > have
> > > >>>>> some
> > > >>>>>>>>> mechanism to reserve optional memory that
> > > >>>>>>>>>       can be revoked if a new operator starts up? Can this
> > > >>> memory
> > > >>>>> be
> > > >>>>>>>>> redistributed? Ideally we should use as
> > > >>>>>>>>>       much as possible of the available memory to avoid
> > > >>> spilling
> > > >>>>>> costs,
> > > >>>>>>>> but
> > > >>>>>>>>> also being able to revoke that memory
> > > >>>>>>>>>
> > > >>>>>>>> This FLIP does not support dynamically revoking/redistribuitng
> > > >>>>> managed
> > > >>>>>>>> memory used by the ExternalSorter.
> > > >>>>>>>>
> > > >>>>>>>> For operators with isInternalSorterSupported = true, we will
> > > >>>> allocate
> > > >>>>>> to
> > > >>>>>>>> this operator execution.sorted-inputs.memory
> > > >>>>>>>> <
> > > >>>>>>>>
> > > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144
> > > >>>>>>>> amount of managed memory. This is the same as how Flink
> > allocates
> > > >>>>>> managed
> > > >>>>>>>> memory to an operator when this operator has keyed inputs in
> > > >>> batch
> > > >>>>>> mode.
> > > >>>>>>>> Note that this FLIP intends to support operators to sort
> inputs
> > > >>>>>> whenever
> > > >>>>>>>> there is backlog. And there is currently no way for an
> operator
> > > >>> to
> > > >>>>> know
> > > >>>>>>> in
> > > >>>>>>>> advance whether there will be no backlog after a given time.
> So
> > > >>> it
> > > >>>>>> seems
> > > >>>>>>>> simpler to just keep managed memory for such an operator
> > > >>> throughout
> > > >>>>> the
> > > >>>>>>>> lifecycle of this operator, for now.
> > > >>>>>>>>
> > > >>>>>>>> Besides, it seems that the lack of ability to dynamically
> > > >>>>>>>> revoke/redistribute un-used managed memory is an existing
> issue
> > > >>> in
> > > >>>>>> Flink.
> > > >>>>>>>> For example, we might have two operators sharing the same slot
> > > >>> and
> > > >>>>>> these
> > > >>>>>>>> two operators both use managed memory (e.g. to sort inputs).
> > > >>> There
> > > >>>> is
> > > >>>>>>>> currently no way for one operator to re-use the memory not
> used
> > > >>> by
> > > >>>>> the
> > > >>>>>>>> other operator.
> > > >>>>>>>>
> > > >>>>>>>> Therefore, I think we can treat this as an orthogonal
> > performance
> > > >>>>>>>> optimization problem which can be addressed separately. What
> do
> > > >>> you
> > > >>>>>>> think?
> > > >>>>>>>>
> > > >>>>>>>>>     - Sometimes sorting, even if we have memory to do that,
> > > >>> might
> > > >>>>> be
> > > >>>>>> an
> > > >>>>>>>>> unnecessary overhead.
> > > >>>>>>>>> - Watermarks
> > > >>>>>>>>>     - Is holding back watermarks always good? If we have tons
> > > >>> of
> > > >>>>> data
> > > >>>>>>>>> buffered/sorted and waiting to be processed
> > > >>>>>>>>>        with multiple windows per key and many different keys.
> > > >>>> When
> > > >>>>> we
> > > >>>>>>>>> switch back to `isBacklog=false` we
> > > >>>>>>>>>        first process all of that data before processing
> > > >>>> watermarks,
> > > >>>>>> for
> > > >>>>>>>>> operators that are not using sorted input the
> > > >>>>>>>>>        state size can explode significantly causing lots of
> > > >>>>> problems.
> > > >>>>>>>> Even
> > > >>>>>>>>> for those that can use sorting, switching to
> > > >>>>>>>>>        sorting or BatchExecutionKeyedStateBackend is not
> > > >>> always a
> > > >>>>>> good
> > > >>>>>>>>> idea, but keeping RocksDB also can be
> > > >>>>>>>>>        risky.
> > > >>>>>>>>>
> > > >>>>>>>> With the current FLIP, the proposal is to use a sorter only
> when
> > > >>>> the
> > > >>>>>>> inputs
> > > >>>>>>>> have keys. According to this practice, operators which are not
> > > >>>> using
> > > >>>>>>>> sorting should have un-keyed inputs. I believe such an
> operator
> > > >>>> will
> > > >>>>>> not
> > > >>>>>>>> even use a keyed state backend. Maybe I missed some use-case.
> > Can
> > > >>>> you
> > > >>>>>>>> provide a use-case where we will have an operator with
> un-keyed
> > > >>>>> inputs
> > > >>>>>>>> whose state size can explode due to we holding back
> watermarks?
> > > >>>>>>>>
> > > >>>>>>>> For operators with keyed inputs that use sorting, I suppose it
> > is
> > > >>>>>>> possible
> > > >>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be worse
> than
> > > >>>>> using
> > > >>>>>>>> RocksDB. But I believe this is very very rare (if possible) in
> > > >>>> almost
> > > >>>>>>>> practical usage of Flink.
> > > >>>>>>>>
> > > >>>>>>>> Take one step back, if this indeed cause regression for a real
> > > >>>>>> use-case,
> > > >>>>>>>> user can set execution.checkpointing.interval-during-backlog
> to
> > > >>>>>> anything
> > > >>>>>>>> other than 0 so that this FLIP will not use
> > > >>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when there
> is
> > > >>>>>> backlog.
> > > >>>>>>>> I would hope we can find a way to automatically determine
> > whether
> > > >>>>> using
> > > >>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or
> worse
> > > >>>> than
> > > >>>>>>> using
> > > >>>>>>>> RocksDB alone. But I could not find a good and reliable way to
> > do
> > > >>>>> this.
> > > >>>>>>>> Maybe we can update Flink to do this when we find a good way
> to
> > > >>> do
> > > >>>>> this
> > > >>>>>>> in
> > > >>>>>>>> the future?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> - Keyed state backend
> > > >>>>>>>>>     - I think you haven't described what happens during
> > > >>> switching
> > > >>>>>> from
> > > >>>>>>>>> streaming to backlog processing.
> > > >>>>>>>>>
> > > >>>>>>>> Good point. This indeed needs to be described. I added a TODO
> in
> > > >>>> the
> > > >>>>>>>> "Behavior changes ..." section to describe what happens when
> > > >>>>> isBacklog
> > > >>>>>>>> switches from false to true, for all
> > > >>>>> watermark/checkpoint/statebackend
> > > >>>>>>> etc.
> > > >>>>>>>> Let me explain this for the state backend here for now. I will
> > > >>>> update
> > > >>>>>>> FLIP
> > > >>>>>>>> later.
> > > >>>>>>>>
> > > >>>>>>>> When isBacklog switches from false to true, operator with
> keyed
> > > >>>>> inputs
> > > >>>>>>> can
> > > >>>>>>>> optionally (as determined by its implementation) starts to use
> > > >>>>> internal
> > > >>>>>>>> sorter to sort inputs by key, without processing inputs or
> > > >>> updating
> > > >>>>>>>> statebackend, until it receives end-of-inputs or isBacklog is
> > > >>>>> switched
> > > >>>>>> to
> > > >>>>>>>> false again.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>>     - Switch can be an unnecessary overhead.
> > > >>>>>>>>
> > > >>>>>>>> I agree it can cause unnecessary overhead, particularly when
> > > >>>>> isBacklog
> > > >>>>>>>> switches back and forth frequently. Whether or not this is
> > > >>>>> unnecessary
> > > >>>>>>>> likely depends on the duration/throughput of the backlog phase
> > as
> > > >>>>> well
> > > >>>>>> as
> > > >>>>>>>> the specific computation logic of the operator. I am not sure
> > > >>> there
> > > >>>>> is
> > > >>>>>> a
> > > >>>>>>>> good way for Flink to determine in advance whether switching
> is
> > > >>>>>>>> unnecessary.
> > > >>>>>>>>
> > > >>>>>>>> Note that for the existing use-case where we expect to change
> > > >>>>> isBacklog
> > > >>>>>>> to
> > > >>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source watermark
> lag
> > > >>>> being
> > > >>>>>> too
> > > >>>>>>>> high), we don't expect the watermark to switch back and force
> > > >>>>>> frequently.
> > > >>>>>>>> And user can disable this switch by setting
> > > >>>>>>>> execution.checkpointing.interval-during-backlog to anything
> > other
> > > >>>>> than
> > > >>>>>> 0.
> > > >>>>>>>> Therefore, I am wondering if we can also view this as a
> > > >>> performance
> > > >>>>>>>> optimization opportunity for extra use-cases in the future,
> > > >>> rather
> > > >>>>>> than a
> > > >>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g.
> snapshot
> > > >>>> phase
> > > >>>>>> for
> > > >>>>>>>> any CDC source, Kafka watermark lag).
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> At the same time, in your current proposal, for
> > > >>>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we
> won't
> > > >>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I don't
> > > >>>>>>> understand
> > > >>>>>>>>> why those two things should be coupled
> > > >>>>>>>>> together?
> > > >>>>>>>>>
> > > >>>>>>>> We can in general classify optimizations as those that are
> > > >>>> compatible
> > > >>>>>>> with
> > > >>>>>>>> checkpointing, and those that are not compatible with
> > > >>>> checkpointing.
> > > >>>>>> For
> > > >>>>>>>> example, input sorting is currently not compatible with
> > > >>>>> checkpointing.
> > > >>>>>>> And
> > > >>>>>>>> buffering input records to reduce state backend overhead (and
> > > >>>>> probably
> > > >>>>>>>> columnar processing for mini-batch in the future) is
> compatible
> > > >>>> with
> > > >>>>>>>> checkpointing.
> > > >>>>>>>>
> > > >>>>>>>> The primary of FLIP-327 is to support optimizations not
> > > >>> compatible
> > > >>>>> with
> > > >>>>>>>> checkpointing. If
> > > >>> execution.checkpointing.interval-during-backlog >
> > > >>>>> 0,
> > > >>>>>>>> which means that user intends to still do checkpointing even
> > when
> > > >>>>> there
> > > >>>>>>> is
> > > >>>>>>>> backog, then we will not be able to support such
> optimizations.
> > > >>>>>>>>
> > > >>>>>>>> For optimizations that are compatible with checkpointing, we
> can
> > > >>> do
> > > >>>>>> this
> > > >>>>>>>> even when the operator does not run in "batch mode". There are
> > > >>>> extra
> > > >>>>>>>> problems to solve in order to achieve this optimization, such
> as
> > > >>>>>>> supporting
> > > >>>>>>>> unaligned checkpointing without prolonging its sync phase. I
> > plan
> > > >>>> to
> > > >>>>>>>> explain how this can be done in FLIP-325.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> All in all, shouldn't we aim for some more clever process of
> > > >>>>>> switching
> > > >>>>>>>> back
> > > >>>>>>>>> and forth between streaming/batch modes
> > > >>>>>>>>> for watermark strategy/state backend/sorting based on some
> > > >>>> metrics?
> > > >>>>>>>> Trying
> > > >>>>>>>>> to either predict if switching might help,
> > > >>>>>>>>> or trying to estimate if the last switch was beneficial?
> Maybe
> > > >>>>>>> something
> > > >>>>>>>>> along the lines:
> > > >>>>>>>>> - sort only in memory and during sorting count the number of
> > > >>>>> distinct
> > > >>>>>>>> keys
> > > >>>>>>>>> (NDK)
> > > >>>>>>>>>     - maybe allow for spilling if so far in memory we have
> NDK
> > > >>> *
> > > >>>> 5
> > > >>>>>> =
> > > >>>>>>>>> #records
> > > >>>>>>>>> - do not allow to buffer records above a certain threshold,
> as
> > > >>>>>>> otherwise
> > > >>>>>>>>> checkpointing can explode
> > > >>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK * 2
> > > >>>> =
> > > >>>>>>> #records
> > > >>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= #records
> > > >>>>>>>>>
> > > >>>>>>>>> Or even maybe for starters something even simpler and then
> test
> > > >>>> out
> > > >>>>>>>>> something more fancy as a follow up?
> > > >>>>>>>>>
> > > >>>>>>>> I agree it is worth investigating these ideas to further
> > optimize
> > > >>>> the
> > > >>>>>>>> performance during backlog.
> > > >>>>>>>>
> > > >>>>>>>> I just think these can be done independently after this FLIP.
> > The
> > > >>>>> focus
> > > >>>>>>> of
> > > >>>>>>>> this FLIP is to re-use in stream mode the same optimization
> > which
> > > >>>> we
> > > >>>>>>>> already use in batch mode, rather than inventing or improving
> > the
> > > >>>>>>>> performance of these existing optimizations.
> > > >>>>>>>>
> > > >>>>>>>> Given that there are already a lot of new mechanism/features
> to
> > > >>>>> discuss
> > > >>>>>>> and
> > > >>>>>>>> address in this FLIP, I am hoping we can limit the scope of
> this
> > > >>>> FLIP
> > > >>>>>> to
> > > >>>>>>>> re-use the existing optimization, and do these extra
> > optimization
> > > >>>>>>>> opportunities as future work.
> > > >>>>>>>>
> > > >>>>>>>> What do you think?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> At the same time,
> > > >>>>> `execution.checkpointing.interval-during-backlog=0`
> > > >>>>>>>> seems
> > > >>>>>>>>> a weird setting to me, that I would
> > > >>>>>>>>> not feel safe recommending to anyone. If processing of a
> > > >>> backlog
> > > >>>>>> takes
> > > >>>>>>> a
> > > >>>>>>>>> long time, a job might stop making
> > > >>>>>>>>> any progress due to some random failures. Especially
> dangerous
> > > >>>> if a
> > > >>>>>> job
> > > >>>>>>>> switches from streaming mode back to
> > > >>>>>>>>> backlog processing due to some reasons, as that could happen
> > > >>>> months
> > > >>>>>>> after
> > > >>>>>>>>> someone started a job with this
> > > >>>>>>>>> strange setting. So should we even have it? I would simply
> > > >>>> disallow
> > > >>>>>>> it. I
> > > >>>>>>>> Good point. I do agree we need to further work to improve the
> > > >>>>> failover
> > > >>>>>>>> performance in case any task fails.
> > > >>>>>>>>
> > > >>>>>>>> As of the current FLIP, if any task fails during backlog and
> > > >>>>>>>> execution.checkpointing.interval-during-backlog = 0, we will
> > need
> > > >>>> to
> > > >>>>>>>> restart all operators to the last checkpointed state and
> > continue
> > > >>>>>>>> processing backlog. And this can be a lot of rollback since
> > there
> > > >>>> is
> > > >>>>> no
> > > >>>>>>>> checkpoint during backlog. And this can also be worse than
> batch
> > > >>>>> since
> > > >>>>>>> this
> > > >>>>>>>> FLIP currently does not support exporting/saving records to
> > local
> > > >>>>> disk
> > > >>>>>>> (or
> > > >>>>>>>> shuffle service) so that a failed task can re-consume the
> > records
> > > >>>>> from
> > > >>>>>>> the
> > > >>>>>>>> upstream task (or shuffle service) in the same way as how
> Flink
> > > >>>>>> failover
> > > >>>>>>> a
> > > >>>>>>>> task in batch mode.
> > > >>>>>>>>
> > > >>>>>>>> I think we can extend this FLIP to solve this problem so that
> it
> > > >>>> can
> > > >>>>>> have
> > > >>>>>>>> at least the same behavior/performance as batch-mode job. The
> > > >>> idea
> > > >>>> is
> > > >>>>>> to
> > > >>>>>>>> also follow what batch mode does. For example, we can trigger
> a
> > > >>>>>>> checkpoint
> > > >>>>>>>> when isBacklog switches to true, and every operator should
> > buffer
> > > >>>> its
> > > >>>>>>>> output in the TM local disk (or remote shuffle service).
> > > >>> Therefore,
> > > >>>>>>> after a
> > > >>>>>>>> task fails, it can restart from the last checkpoint and
> > > >>> re-consume
> > > >>>>> data
> > > >>>>>>>> buffered in the upstream task.
> > > >>>>>>>>
> > > >>>>>>>> I will update FLIP as described above. Would this address your
> > > >>>>> concern?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> could see a power setting like:
> > > >>>>>>>>>         `execution.backlog.use-full-batch-mode-on-start
> > > >>> (default
> > > >>>>>>> false)`
> > > >>>>>>>> I am not sure I fully understand this config or its
> motivation.
> > > >>> Can
> > > >>>>> you
> > > >>>>>>>> help explain the exact semantics of this config?
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> that would override any heuristic of switching to backlog if
> > > >>>>> someone
> > > >>>>>> is
> > > >>>>>>>>> submitting a new job that starts with
> > > >>>>>>>>> `isBacklog=true`.
> > > >>>>>>>>>
> > > >>>>>>>>> Or we could limit the scope of this FLIP to only support
> > > >>> starting
> > > >>>>>> with
> > > >>>>>>>>> batch mode and switching only once to
> > > >>>>>>>>> streaming, and design a follow up with switching back and
> > > >>> forth?
> > > >>>>>>>> Sure, that sounds good to me. I am happy to split this FLIP
> into
> > > >>>> two
> > > >>>>>>> FLIPs
> > > >>>>>>>> so that we can make incremental progress.
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>> Dong
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>> I'm looking forwards to hearing/reading out your thoughts.
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Piotrek
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge <j...@ververica.com.invalid
> >
> > > >>>>>>>> napisał(a):
> > > >>>>>>>>>> Hi Dong,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for your reply!
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best regards,
> > > >>>>>>>>>> Jing
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin <
> > > >>> lindon...@gmail.com>
> > > >>>>>>> wrote:
> > > >>>>>>>>>>> Hi Jing,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for the comments. Please see my reply inline.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge
> > > >>>>>> <j...@ververica.com.invalid
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Dong,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks for the clarification. Now it is clear for me. I
> > > >>> got
> > > >>>>>>>>> additional
> > > >>>>>>>>>>> noob
> > > >>>>>>>>>>>> questions wrt the internal sorter.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 1. when to call setter to set the internalSorterSupported
> > > >>>> to
> > > >>>>> be
> > > >>>>>>>> true?
> > > >>>>>>>>>>> Developer of the operator class (i.e. those classes which
> > > >>>>>>> implements
> > > >>>>>>>>>>> `StreamOperator`) should override the
> > > >>>>> `#getOperatorAttributes()`
> > > >>>>>>> API
> > > >>>>>>>> to
> > > >>>>>>>>>> set
> > > >>>>>>>>>>> internalSorterSupported to true, if he/she decides to sort
> > > >>>>>> records
> > > >>>>>>>>>>> internally in the operator.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> 2
> > > >>>>>>>>>>>> *"For those operators whose throughput can be
> > > >>> considerably
> > > >>>>>>> improved
> > > >>>>>>>>>> with
> > > >>>>>>>>>>> an
> > > >>>>>>>>>>>> internal sorter, update it to take advantage of the
> > > >>>> internal
> > > >>>>>>> sorter
> > > >>>>>>>>>> when
> > > >>>>>>>>>>>> its input has isBacklog=true.*
> > > >>>>>>>>>>>> *Typically, operators that involve aggregation operation
> > > >>>>> (e.g.
> > > >>>>>>>> join,
> > > >>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from
> > > >>> using
> > > >>>> an
> > > >>>>>>>>> internal
> > > >>>>>>>>>>>> sorter."*
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> *"The operator that performs CoGroup operation will
> > > >>>>> instantiate
> > > >>>>>>> two
> > > >>>>>>>>>>>> internal sorter to sorts records from its two inputs
> > > >>>>>> separately.
> > > >>>>>>>> Then
> > > >>>>>>>>>> it
> > > >>>>>>>>>>>> can pull the sorted records from these two sorters. This
> > > >>>> can
> > > >>>>> be
> > > >>>>>>>> done
> > > >>>>>>>>>>>> without wrapping input records with TaggedUnion<...>. In
> > > >>>>>>>> comparison,
> > > >>>>>>>>>> the
> > > >>>>>>>>>>>> existing DataStream#coGroup needs to wrap input records
> > > >>>> with
> > > >>>>>>>>>>>> TaggedUnion<...> before sorting them using one external
> > > >>>>> sorter,
> > > >>>>>>>> which
> > > >>>>>>>>>>>> introduces higher overhead."*
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> According to the performance test, it seems that internal
> > > >>>>>> sorter
> > > >>>>>>>> has
> > > >>>>>>>>>>> better
> > > >>>>>>>>>>>> performance than external sorter. Is it possible to make
> > > >>>>> those
> > > >>>>>>>>>> operators
> > > >>>>>>>>>>>> that can benefit from it use internal sorter by default?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> Yes, it is possible. After this FLIP is done, users can use
> > > >>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the window
> > > >>>>> assigner
> > > >>>>>>> to
> > > >>>>>>>>>>> co-group two streams in effectively the batch manner. An
> > > >>>>> operator
> > > >>>>>>>> that
> > > >>>>>>>>>> uses
> > > >>>>>>>>>>> an internal sorter will be used to perform the co-group
> > > >>>>>> operation.
> > > >>>>>>>>> There
> > > >>>>>>>>>> is
> > > >>>>>>>>>>> no need for users of the DataStream API to explicitly know
> > > >>> or
> > > >>>>> set
> > > >>>>>>> the
> > > >>>>>>>>>>> internal sorter in anyway.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> In the future, we plan to incrementally optimize other
> > > >>>>>> aggregation
> > > >>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when
> > > >>>>>>>>> EndOfStreamWindows
> > > >>>>>>>>>> is
> > > >>>>>>>>>>> used as the window assigner.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> Dong
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>> Jing
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin <
> > > >>>>> lindon...@gmail.com>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>>>>> Hi Jing,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thank you for the comments! Please see my reply inline.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge
> > > >>>>>>>> <j...@ververica.com.invalid
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Dong,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good
> > > >>>>>> shape. I
> > > >>>>>>>> got
> > > >>>>>>>>>>> some
> > > >>>>>>>>>>>>> NIT
> > > >>>>>>>>>>>>>> questions.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 1. It is a little bit weird to write the hint right
> > > >>>> after
> > > >>>>>> the
> > > >>>>>>>>>>>> motivation
> > > >>>>>>>>>>>>>> that some features have been moved to FLIP-331,
> > > >>> because
> > > >>>>> at
> > > >>>>>>> that
> > > >>>>>>>>>> time,
> > > >>>>>>>>>>>>>> readers don't know the context about what features
> > > >>> does
> > > >>>>> it
> > > >>>>>>>> mean.
> > > >>>>>>>>> I
> > > >>>>>>>>>>>> would
> > > >>>>>>>>>>>>>> suggest moving the note to the beginning of "Public
> > > >>>>>>> interfaces"
> > > >>>>>>>>>>>> sections.
> > > >>>>>>>>>>>>> Given that the reviewer who commented on this email
> > > >>>> thread
> > > >>>>>>>> before I
> > > >>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I
> > > >>>> think
> > > >>>>> it
> > > >>>>>>> is
> > > >>>>>>>>>>> simpler
> > > >>>>>>>>>>>> to
> > > >>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated the
> > > >>>>> FLIP
> > > >>>>>>>>>>> accordingly.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 2. It is also a little bit weird to describe all
> > > >>>>> behaviour
> > > >>>>>>>>> changes
> > > >>>>>>>>>> at
> > > >>>>>>>>>>>>> first
> > > >>>>>>>>>>>>>> but only focus on one single feature, i.e. how to
> > > >>>>> implement
> > > >>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was
> > > >>>>>> reading
> > > >>>>>>>> the
> > > >>>>>>>>>>> Public
> > > >>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another
> > > >>> option
> > > >>>>>> could
> > > >>>>>>>> be
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>> write a
> > > >>>>>>>>>>>>>> short summary of all features and point out that this
> > > >>>>> FLIP
> > > >>>>>>> will
> > > >>>>>>>>>> only
> > > >>>>>>>>>>>>> focus
> > > >>>>>>>>>>>>>> on the internalSorterSupported feature. Others could
> > > >>> be
> > > >>>>>> found
> > > >>>>>>>> in
> > > >>>>>>>>>>>>> FLIP-331.
> > > >>>>>>>>>>>>>> WDYT?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a
> > > >>>> stream
> > > >>>>>>> mode
> > > >>>>>>>>> job
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> run
> > > >>>>>>>>>>>>> parts of the topology in batch mode so that it can
> > > >>> apply
> > > >>>>>>>>>>>>> optimizations/computations that can not be used
> > > >>> together
> > > >>>>> with
> > > >>>>>>>>>>>> checkpointing
> > > >>>>>>>>>>>>> (and thus not usable in stream mode). Although internal
> > > >>>>>> sorter
> > > >>>>>>> is
> > > >>>>>>>>> the
> > > >>>>>>>>>>>> only
> > > >>>>>>>>>>>>> optimization immediately supported in this FLIP, this
> > > >>>> FLIP
> > > >>>>>> lays
> > > >>>>>>>> the
> > > >>>>>>>>>>>>> foundation to support other optimizations in the
> > > >>> future,
> > > >>>>> such
> > > >>>>>>> as
> > > >>>>>>>>>> using
> > > >>>>>>>>>>>> GPU
> > > >>>>>>>>>>>>> to process a bounded stream of records.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Therefore, I find it better to keep the current title
> > > >>>>> rather
> > > >>>>>>> than
> > > >>>>>>>>>>>> limiting
> > > >>>>>>>>>>>>> the scope to internal sorter. What do you think?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and
> > > >>> failover
> > > >>>>>>>> strategy
> > > >>>>>>>>> ->
> > > >>>>>>>>>>>> Mixed
> > > >>>>>>>>>>>>>> mode ->
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>    - If any task fails when isBacklog=false true,
> > > >>> this
> > > >>>>> task
> > > >>>>>>> is
> > > >>>>>>>>>>>> restarted
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>    re-process its input from the beginning.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thank you for catching this issue. It is fixed now.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>> Dong
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best regards
> > > >>>>>>>>>>>>>> Jing
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin <
> > > >>>>>> lindon...@gmail.com
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>> Hi Piotr,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks for your comments! Please see my reply
> > > >>> inline.
> > > >>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski <
> > > >>>>>>>>>>>>> piotr.nowoj...@gmail.com
> > > >>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Dong,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I have a couple of questions.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Could you explain why those properties
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>     @Nullable private Boolean isOutputOnEOF =
> > > >>> null;
> > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > >>> isOutputOnCheckpoint
> > > >>>> =
> > > >>>>>>> null;
> > > >>>>>>>>>>>>>>>>     @Nullable private Boolean
> > > >>>>>> isInternalSorterSupported =
> > > >>>>>>>>> null;
> > > >>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the
> > > >>> default
> > > >>>>>> value
> > > >>>>>>>> set
> > > >>>>>>>>> to
> > > >>>>>>>>>>>>>> `false`?
> > > >>>>>>>>>>>>>>> By initializing these private variables in
> > > >>>>>>>>>>> OperatorAttributesBuilder
> > > >>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>> null, we can implement
> > > >>>>>> `OperatorAttributesBuilder#build()`
> > > >>>>>>> in
> > > >>>>>>>>>> such
> > > >>>>>>>>>>> a
> > > >>>>>>>>>>>>> way
> > > >>>>>>>>>>>>>>> that it can print DEBUG level logging to say
> > > >>>>>>>>>> "isOutputOnCheckpoint
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug
> > > >>>>> performance
> > > >>>>>>>>> issues
> > > >>>>>>>>>>> (or
> > > >>>>>>>>>>>>> lack
> > > >>>>>>>>>>>>>>> of the expected optimization) due to operators not
> > > >>>>>>> explicitly
> > > >>>>>>>>>>> setting
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> right operator attribute.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> For example, we might want a job to always use the
> > > >>>>> longer
> > > >>>>>>>>>>>> checkpointing
> > > >>>>>>>>>>>>>>> interval (i.e.
> > > >>>>>>>> execution.checkpointing.interval-during-backlog)
> > > >>>>>>>>>> if
> > > >>>>>>>>>>>> all
> > > >>>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false,
> > > >>>> and
> > > >>>>>> use
> > > >>>>>>>> the
> > > >>>>>>>>>>> short
> > > >>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has
> > > >>>>>> explicitly
> > > >>>>>>>>>>> configured
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but
> > > >>>> the
> > > >>>>>>>>> two-phase
> > > >>>>>>>>>>>>> commit
> > > >>>>>>>>>>>>>>> sink library has not been upgraded to set
> > > >>>>>>>>>>> isOutputOnCheckpoint=true,
> > > >>>>>>>>>>>>> then
> > > >>>>>>>>>>>>>>> the job will end up using the long checkpointing
> > > >>>>>> interval,
> > > >>>>>>>> and
> > > >>>>>>>>> it
> > > >>>>>>>>>>>> will
> > > >>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>> useful to figure out what is going wrong in this
> > > >>> case
> > > >>>>> by
> > > >>>>>>>>> checking
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>> log.
> > > >>>>>>>>>>>>>>> Note that the default value of these fields of the
> > > >>>>>>>>>>> OperatorAttributes
> > > >>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will
> > > >>>> still
> > > >>>>> be
> > > >>>>>>>>> false.
> > > >>>>>>>>>>> The
> > > >>>>>>>>>>>>>>> following is mentioned in the Java doc of
> > > >>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> /**
> > > >>>>>>>>>>>>>>>   * If any operator attribute is null, we will log
> > > >>> it
> > > >>>>> at
> > > >>>>>>>> DEBUG
> > > >>>>>>>>>>> level
> > > >>>>>>

Reply via email to