Thanks for addressing my comments. LGTM
Best, Xintong On Mon, Sep 18, 2023 at 3:10 PM Dong Lin <lindon...@gmail.com> wrote: > Hi Xintong, > > Thank you for all the comments. Please see my reply inline. > > > On Mon, Sep 18, 2023 at 11:31 AM Xintong Song <tonysong...@gmail.com> > wrote: > > > Thanks for addressing my comments, Dong. > > > > The expected behavior of checkpointing and failover depends on whether > > > there is any operator currently running in the job with all its inputs' > > > isBacklog=true. If there exists such an operator and > > > interval-during-backlog = 0, then checkpoint will be disabled and the > > > operator will have to failover in a way similar to batch mode. > > > > > > This makes sense to me. Shall we also put this into the FLIP. Or maybe > you > > already did that and I overlooked it? The current description in "4) > > Checkpoint and failover strategy" -> "Mixed mode" is a bit unclear to me. > > It says "At the point when isBacklog switches to false, source operator > > ...", which sounds like upon any source operator switching to isBacklog = > > false. > > > > > I think it is kind of mentioned in the doc > of execution.checkpointing.interval-during-backlog, which says "if it is > not null and any source reports isProcessingBacklog=true, it is the > interval...". > > Based on this doc, we can derive that if there is one operator reporting > isBacklog=true, then the checkpointing interval is determined by > interval-during-backlog, which in this case has value 0 indicating that > checkpoint triggering is disabled. > > Given that other readers might also have this question, I have updated the > FLIP-327 with the following statement to make it more explicit: "For jobs > with multiple sources and execution.checkpointing.interval-during-backlog = > 0, checkpoint triggering is enabled if and only if all sources have > isBacklog=false". > > > > > I am not sure what is the concern with having `flink-streaming-java` > depend > > > on `flink-runtime`. Can you clarify the exact concern? > > > > > > > The concern here is that an API module should not depend on a runtime > > module. Currently, we have the "user codes -> flink-streaming-java -> > > flink-runtime" dependency chain, which makes binary compatibility > > impossible because any runtime changes can break the compatibility with a > > user jar (which bundles flink-streaming-java) compiled for an older > > version. Ideally, we want the runtime module to depend on the API module, > > rather than the other way around. This is one of the issues we are trying > > to resolve with the programmatic API refactor. However, the way we are > > trying to resolve it is to introduce another API module and gradually > > replace the current DataStream API / flink-streaming-java, which means > > flink-streaming-java will stay depending on flink-runtime for a while > > anyway. So the concern here is minor, only about we might need more > effort > > when reworking this with the new API. > > > > Thanks for the detailed explanation. Given that we plan to avoid having > flink-streaming-java depend on flink-runtime, I agree it is preferred to > avoid introducing more dependencies like this. > > I have updated the FLIP to let RecordAttributes extend StreamElement. > > Best, > Dong > > > > The rest of your replies make sense to me. > > > > Best, > > > > Xintong > > > > > > > > On Fri, Sep 15, 2023 at 10:05 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > Hi Xintong, > > > > > > Thanks for your comments! Please see my reply inline. > > > > > > On Thu, Sep 14, 2023 at 4:58 PM Xintong Song <tonysong...@gmail.com> > > > wrote: > > > > > > > Sorry to join the discussion late. > > > > > > > > Overall, I think it's a good idea to support dynamically switching > the > > > > operator algorithms between Streaming (optimized towards low latency > + > > > > checkpointing supports) and Batch (optimized towards throughput). > This > > is > > > > indeed a big and complex topic, and I really appreciate the previous > > > > discussions that narrow the scope of this FLIP down to only > considering > > > > switching from Batch to Streaming as a first step. > > > > > > > > I have several questions. > > > > > > > > 1. The FLIP discusses various behaviors under 4 scenarios: streaming > > > mode, > > > > batch mode, mixed mode with checkpoint interval > 0, mixed mode with > > > > checkpoint interval = 0. IIUC, this is because many batch > optimizations > > > > cannot be supported together with checkpointing. This justifies that > in > > > > mixed mode with interval > 0, most behaviors are the same as in > > streaming > > > > mode. However, mixed mode with checkpoint interval = 0 does not > always > > > > necessarily mean we should apply such optimization. It is possible > that > > > in > > > > some cases (likely with small data amounts) the cost of such > > > optimizations > > > > are higher than the benefit. Therefore, I'd suggest decoupling the > > > concept > > > > of applying these optimizations (i.e., the batch execution phase in > the > > > > mixed mode) from whether checkpointing is enabled or not. In > > particular, > > > > I'd suggest removing the scenario "mixed mode with > > > > e.c.interval-during-backlog > 0", changing the scenario "mixed mode > > with > > > > e.c.interval-during-backlog = 0" to simply "mixed mode", and say that > > can > > > > have different strategies for deciding whether to enable the mixed > mode > > > and > > > > as the first step the strategy is to enable it when > > > > e.c.interval-during-backlog = 0. > > > > > > > > > > Thanks for the detailed explanation! > > > > > > I have updated the "Behavior changes when switching from batch mode to > > > stream mode" section with the following changes. > > > > > > 1) Remove the description of "mixed mode with interval-during-backlog > > > 0" > > > and add the statement saying that "after this FLIP, the behavior of > Flink > > > runtime with execution.runtime-mode = streaming AND > > > execution.checkpointing.interval-during-backlog > 0, will be same as > the > > > stream mode prior to this FLIP" > > > > > > 2) Add the statement saying that "Mixed mode refers to the behavior of > > > Flink runtime after this FLIP with execution.runtime-mode = streaming > AND > > > execution.checkpointing.interval-during-backlog = 0". > > > > > > 3) Add the statement saying that "It is possible for mixed mode to be > > > slower than stream mode, particularly when there is only small amount > of > > > input records and the overhead of buffering/sorting inputs out-weight > its > > > benefit. This is similar to how the merge join might be slower than > hash > > > join. This FLIP focuses on optimizing the Flink throughput when there > is > > a > > > high number of input records. In the future, we might introduce more > > > strategies to turn on mix mode in a smart way to avoid performance > > > regression". > > > > > > Would this address your concern? > > > > > > > > > > > > > > 2. According to the FLIP, before isBacklog = false, the timer service > > > only > > > > keeps timers for the current key. It also says upon the end of each > > key, > > > it > > > > fires timers of the key up to the last watermark. IIUC, that means > not > > > all > > > > timers are guaranteed to be fired. It is possible that some timers > are > > > left > > > > to be triggered after isBacklog switching to false. If the timer > > service > > > > only keeps timers for the current key, those not-fired timers may get > > > lost > > > > when switching to a new key. > > > > > > > > > > Thanks for catching this. You are right that all timers should be fired > > as > > > long as the corresponding firing condition (either processing-time or > > > event-time) is satisfied. > > > > > > I have updated the "Timer Service" part of the "Behavior changes when > > > switching from batch mode to stream mode" section accordingly. Can you > > see > > > if it addresses your concern? > > > > > > > > > > > > > > 3. Is it possible that some sources / operators in the job switch to > > > > isBacklog = false, while others are still isBacklog = true? In that > > case, > > > > what is the expected behavior for checkpointing and failover? > > > > > > > > > > Yes, it is possible. And in this case, Flink runtime will handle this > > > operator as if all the operator's inputs have isBacklog=false. In > > > particular, Flink runtime will not automatically sort inputs of this > > > operator. > > > > > > I added the following statement in the FLIP to clarify the behavior: > "For > > > an operator with 2+ inputs, where some inputs have isBacklog=true and > > some > > > other inputs have isBacklog=false, Flink runtime will handle this > > operator > > > as if all its inputs have isBacklog=false". > > > > > > The expected behavior of checkpointing and failover depends on whether > > > there is any operator currently running in the job with all its inputs' > > > isBacklog=true. If there exists such an operator > > > and interval-during-backlog = 0, then checkpoint will be disabled and > the > > > operator will have to failover in a way similar to batch mode. > > > > > > > > > > > > > > > > > 4. Do we require RecordAttributes to be properly handled by all > > > operators? > > > > Or do we consider it as hints that operators may benefit from looking > > > into > > > > it but should not run into any problems ignoring it? I'm asking > > because, > > > if > > > > they are required to be properly handled, we probably need a way to > > > enforce > > > > operators to deal with it. > `processRecordAttributes(RecordAttributes)` > > > > might not be a good fit because we don't know whether the operator > has > > > > looked into all necessary fields of `RecordAttributes`. > > > > > > > > > > As of this FLIP, we would not require RecordAttributes to be handled > any > > > operator in order to achieve correctness. So it is more like a hint. > More > > > specifically, the isBacklog attribute provides a hint for an operator > to > > > optionally delay the processing of its inputs if doing so can improve > its > > > throughput. > > > > > > I think it would be useful to avoid requiring operators to explicitly > > > handling attributes contained in RecordAttributes. This is because we > > want > > > the features added in this FLIP (and future FLIPs) to be backward > > > compatible without breaking the correctness of existing jobs. > > > > > > Suppose we really need to add a record attribute that should be > > explicitly > > > handled by every operator, I believe we can always find a way to > enforce > > > this requirement (e.g. fail job compilation with proper error) in the > > > future. For example, we can add a method such as > > handleXXXRecodAttribute() > > > in the operator interface without default implementation. > > > > > > > > > > > > > > 5. I wonder if there's any strong reasons behind choosing > > `RuntimeEvent` > > > > over `StreamElement` for `RecordAttributes` to extend? My concern is > > > that, > > > > the current approach introduces one more dependency from > > > > `flink-streaming-java` (operators that uses `RecordAttributes`) to > > > > `flink-runtime` (where `RuntimeEvent` comes from), which seems to be > > > > unnecessary. > > > > > > > > > > There is no strong reason to choose `RuntimeEvent` over > `StreamElement`. > > I > > > think the main (and minor) reason for doing so is the simplicity of > > > implementation. For example, we don't need to add methods such > > > as StreamElement#isRecordAttributes and > StreamElement#asRecordAttributes. > > > > > > I am not sure what is the concern with having `flink-streaming-java` > > depend > > > on `flink-runtime`. Can you clarify the exact concern? > > > > > > In any case, I don't have a strong preference between `RuntimeEvent` > over > > > `StreamElement`. We can update the FLIP to use `StreamElement` as long > as > > > there is a well-defined non-trivial reason for making this choice. > > > > > > > > > > 6. The FLIP says it leverages state backend optimizations introduced > in > > > > FLIP-325. Just for clarification, does this mean this FLIP is > depending > > > on > > > > FLIP-325, and probably should not be voted / accepted until FLIP-325 > is > > > > accepted? > > > > > > > > > > Yes, the proposed implementation of this FLIP depends on FLIP-325. We > can > > > start voting thread for FLIP-327 after FLIP-325 is accepted. Maybe we > can > > > continue to discuss this FLIP in the mean time (before FLIP-325 is > > > accepted). > > > > > > Thanks again for the very detailed and helpful review! Looking forward > to > > > your follow-up comments. > > > > > > Best, > > > Dong > > > > > > > > > > Best, > > > > > > > > Xintong > > > > > > > > > > > > > > > > On Fri, Sep 1, 2023 at 12:48 AM Jan Lukavský <je...@seznam.cz> > wrote: > > > > > > > > > Hi, > > > > > > > > > > some keywords in this triggered my attention, so sorry for late > > jumping > > > > > in, but I'd like to comprehend the nature of the proposal. > > > > > > > > > > I'll try to summarize my understanding: > > > > > > > > > > The goal of the FLIP is to support automatic switching between > > > streaming > > > > > and batch processing, leveraging the fact that batch processing is > > more > > > > > computationally effective. This makes perfect sense. > > > > > > > > > > Looking at the streaming vs. batch semantics, switching from > > streaming > > > > > to batch means the following: > > > > > > > > > > a) generally, watermarks are not propagated in batch, watermark > > moves > > > > > from -inf to +inf in one step, at the end of batch input, this > might > > > > > (and probably will) skip many invocations of timers > > > > > > > > > > b) grouping by key (and window) can be done efficiently, because > it > > > > > can be done by sort-group and ideally parallelized by window (with > > some > > > > > caveats) > > > > > > > > > > The switch also has some conditions, namely: > > > > > > > > > > i) batch mode does not do checkpoints, inputs must be accessible > > > > > repeatedly (forever) > > > > > > > > > > ii) due to failures in batch mode, inputs might be reprocessed > and > > > > > thus must be immutable or all sub-results computed in all branches > of > > > > > the computation (even possibly unaffected by the failure) have to > be > > > > > discarded and recomputed from scratch > > > > > > > > > > Obviously, in case of the switch from batch to streaming, the > > property > > > > > a) has to be modified so the watermark does not move to +inf, but > to > > > > > min(streaming watermark). Giving these properties, it should be > > > possible > > > > > to exchange batch and streaming processing without any cooperation > > with > > > > > the application logic itself. Is my understanding correct? > > > > > > > > > > If so, there is still one open question to efficiency, though. The > > > > > streaming operator _might_ need sorting by timestamp (e.g. > processing > > > > > time-series data, or even sequential data). In that case simply > > > > > switching streaming semantics to batch processing does not yield > > > > > efficient processing, because the operator still needs to buffer > and > > > > > manually sort all the input data (batch data is always unordered). > On > > > > > the other hand, the batch runner already does sorting (for grouping > > by > > > > > key), so adding additional sorting criterion is very cheap. In > Apache > > > > > Beam, we introduced a property of a stateful PTransform (DoFn) > called > > > > > @RequiresTimeSortedInput [1], which can then be implemented > > efficiently > > > > > by batch engines. > > > > > > > > > > Does the FLIP somehow work with conditions i) and ii)? I can > imagine > > > for > > > > > instance that if data is read from say Kafka, then if backlog gets > > > > > sufficiently large, then even the batch processing can take > > substantial > > > > > time and if it fails after long processing, some of the original > data > > > > > might be already rolled out from Kafka topic. > > > > > > > > > > In the FLIP there are some proposed changes to sources to emit > > metadata > > > > > about if the records come from backlog. What is the driving line of > > > > > thoughts why this is needed? In my point of view, streaming engines > > are > > > > > _always_ processing backlog, the only question is "how delayed are > > the > > > > > currently processed events after HEAD", or more specifically in > this > > > > > case "how many elements can we expect to process if the source > would > > > > > immediately stop receiving more data?". This should be configurable > > > > > using simple option defining the difference between current > > > > > processing-time (JM) and watermark of the source, or am I missing > > > > > something? > > > > > > > > > > Thanks for clarification and all the best, > > > > > > > > > > Jan > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html > > > > > > > > > > On 8/31/23 13:17, Xuannan Su wrote: > > > > > > Hi all, > > > > > > > > > > > > I would like to share some updates on FLIP-327. Dong and I have > > had a > > > > > > series of discussions and have made several refinements to the > > FLIP. > > > > > > > > > > > > The major change to the FLIP is to allow the input of the > one-input > > > > > > operator to be automatically sorted during backlog processing. > When > > > > > > combined with the state backend optimization introduced in > FLIP-325 > > > > [1], > > > > > > all the keyed single-input operators can achieve similar > > performance > > > as > > > > > in > > > > > > batch mode during backlog processing without any code change to > the > > > > > > operator. We also implemented a POC[2] and conducted benchmark[3] > > > using > > > > > the > > > > > > KeyedStream#reduce operation. The benchmark results demonstrate > the > > > > > > performance gains that this FLIP can offer. > > > > > > > > > > > > I am looking forward to any comments or feedback you may have on > > this > > > > > FLIP. > > > > > > > > > > > > Best, > > > > > > Xuannan > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Introduce+LRU+cache+to+accelerate+state+backend+access > > > > > > [2] https://github.com/Sxnan/flink/tree/FLIP-327-demo > > > > > > [3] > > > > > > > > > > > > > > > > > > > > > https://github.com/Sxnan/flink/blob/d77d0d3fb268de0a1939944ea4796a112e2d68c0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/backlog/ReduceBacklogBenchmark.java > > > > > > > > > > > > > > > > > > > > > > > >> On Aug 18, 2023, at 21:28, Dong Lin <lindon...@gmail.com> > wrote: > > > > > >> > > > > > >> Hi Piotr, > > > > > >> > > > > > >> Thanks for the explanation. > > > > > >> > > > > > >> To recap our offline discussion, there is a concern regarding > the > > > > > >> capability to dynamically switch between stream and batch modes. > > > This > > > > > >> concern is around unforeseen behaviors such as bugs or > performance > > > > > >> regressions, which we might not yet be aware of yet. The reason > > for > > > > this > > > > > >> concern is that this feature involves a fundamental impact on > the > > > > Flink > > > > > >> runtime's behavior. > > > > > >> > > > > > >> Due to the above concern, I agree it is reasonable to annotate > > > related > > > > > > APIs > > > > > >> as experimental. This step would provide us with the flexibility > > to > > > > > modify > > > > > >> these APIs if issues arise in the future. This annotation also > > > serves > > > > > as a > > > > > >> note to users that this functionality might not perform well as > > > > > expected. > > > > > >> > > > > > >> Though I believe that we can ensure the reliability of this > > feature > > > > > > through > > > > > >> good design and code reviews, comprehensive unit tests, and > > thorough > > > > > >> integration testing, I agree that it is reasonable to be extra > > > > cautious > > > > > in > > > > > >> this case. Also, it should be OK to delay making these APIs as > > > > > >> non-experimental by 1-2 releases. > > > > > >> > > > > > >> I have updated FLIP-327, FLIP-328, and FLIP-331 to mark APIs in > > > these > > > > > docs > > > > > >> as experimental. Please let me know if you think any other API > > > should > > > > > also > > > > > >> be marked as experimental. > > > > > >> > > > > > >> Thanks! > > > > > >> Dong > > > > > >> > > > > > >> On Wed, Aug 16, 2023 at 10:39 PM Piotr Nowojski < > > > > > piotr.nowoj...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >>> Hi Dong, > > > > > >>> > > > > > >>> Operators API is unfortunately also our public facing API and I > > > mean > > > > > the > > > > > >>> APIs that we will add there should also be marked > `@Experimental` > > > > IMO. > > > > > >>> > > > > > >>> The config options should also be marked as experimental (both > > > > > >>> annotated @Experimental and noted the same thing in the docs, > > > > > >>> if @Experimental annotation is not automatically mentioned in > the > > > > > docs). > > > > > >>> > > > > > >>>> Alternatively, how about we add a doc for > > > > > >>> checkpointing.interval-during-backlog explaining its > > impact/concern > > > > as > > > > > >>> discussed above? > > > > > >>> > > > > > >>> We should do this independently from marking the APIs/config > > > options > > > > as > > > > > >>> `@Experimental` > > > > > >>> > > > > > >>> Best, > > > > > >>> Piotrek > > > > > >>> > > > > > >>> pt., 11 sie 2023 o 14:55 Dong Lin <lindon...@gmail.com> > > > napisał(a): > > > > > >>> > > > > > >>>> Hi Piotr, > > > > > >>>> > > > > > >>>> Thanks for the reply! > > > > > >>>> > > > > > >>>> On Fri, Aug 11, 2023 at 4:44 PM Piotr Nowojski < > > > > > piotr.nowoj...@gmail.com > > > > > >>>> > > > > > >>>> wrote: > > > > > >>>> > > > > > >>>>> Hi, > > > > > >>>>> > > > > > >>>>> Sorry for the long delay in responding! > > > > > >>>>> > > > > > >>>>>> Given that it is an optional feature that can be > > > > > >>>>>> turned off by users, it might be OK to just let users try it > > out > > > > and > > > > > >>> we > > > > > >>>>> can > > > > > >>>>>> fix performance issues once we detect any of them. What do > you > > > > > think? > > > > > >>>>> I think it's fine. It would be best to mark this feature as > > > > > >>> experimental, > > > > > >>>>> and > > > > > >>>>> we say that the config keys or the default values might > change > > in > > > > the > > > > > >>>>> future. > > > > > >>>>> > > > > > >>>> In general I agree we can mark APIs that determine "whether to > > > > enable > > > > > >>>> dynamic switching between stream/batch mode" as experimental. > > > > > >>>> > > > > > >>>> However, I am not sure we have such an API yet. The APIs added > > in > > > > this > > > > > >>> FLIP > > > > > >>>> are intended to be used by operator developers rather than end > > > > users. > > > > > > End > > > > > >>>> users can enable this capability by setting > > > > > >>>> execution.checkpointing.interval-during-backlog = Long.MAX and > > > uses > > > > a > > > > > >>>> source which might implicitly set backlog statu (e.g. > > > HybridSource). > > > > > So > > > > > >>>> execution.checkpointing.interval-during-backlog is the only > > > > > user-facing > > > > > >>>> APIs that can always control whether this feature can be used. > > > > > >>>> > > > > > >>>> However, execution.checkpointing.interval-during-backlog > itself > > is > > > > not > > > > > >>> tied > > > > > >>>> to FLIP-327. > > > > > >>>> > > > > > >>>> Do you mean we should set > checkpointing.interval-during-backlog > > as > > > > > >>>> experimental? Alternatively, how about we add a doc for > > > > > >>>> checkpointing.interval-during-backlog explaining its > > > impact/concern > > > > as > > > > > >>>> discussed above? > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Dong > > > > > >>>> > > > > > >>>> > > > > > >>>>>> Maybe we can revisit the need for such a config when we > > > > > >>>> introduce/discuss > > > > > >>>>>> the capability to switch backlog from false to true in the > > > future. > > > > > >>> What > > > > > >>>>> do > > > > > >>>>>> you think? > > > > > >>>>> Sure, we can do that. > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Piotrek > > > > > >>>>> > > > > > >>>>> niedz., 23 lip 2023 o 14:32 Dong Lin <lindon...@gmail.com> > > > > > napisał(a): > > > > > >>>>> > > > > > >>>>>> Hi Piotr, > > > > > >>>>>> > > > > > >>>>>> Thanks a lot for the explanation. Please see my reply > inline. > > > > > >>>>>> > > > > > >>>>>> On Fri, Jul 21, 2023 at 10:49 PM Piotr Nowojski < > > > > > >>>>> piotr.nowoj...@gmail.com> > > > > > >>>>>> wrote: > > > > > >>>>>> > > > > > >>>>>>> Hi Dong, > > > > > >>>>>>> > > > > > >>>>>>> Thanks a lot for the answers. I can now only briefly answer > > > your > > > > > >>> last > > > > > >>>>>>> email. > > > > > >>>>>>> > > > > > >>>>>>>> It is possible that spilling to disks might cause larger > > > > > >>> overhead. > > > > > >>>>> IMO > > > > > >>>>>> it > > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is > > > > > >>> because a > > > > > >>>>>> Flink > > > > > >>>>>>>> job running batch mode might also be slower than its > > > throughput > > > > > >>> in > > > > > >>>>>> stream > > > > > >>>>>>>> mode due to the same reason. > > > > > >>>>>>> Yes, I know, but the thing that worries me is that > previously > > > > only > > > > > >>> a > > > > > >>>>> user > > > > > >>>>>>> alone > > > > > >>>>>>> could decide whether to use batch mode or streaming, and in > > > > > >>> practice > > > > > >>>>> one > > > > > >>>>>>> user would rarely (if ever) use both for the same > > > > > >>> problem/job/query. > > > > > >>>> If > > > > > >>>>>> his > > > > > >>>>>>> intention was to eventually process live data, he was using > > > > > >>> streaming > > > > > >>>>>> even > > > > > >>>>>>> if there was a large backlog at the start (apart of some > very > > > few > > > > > >>>> very > > > > > >>>>>>> power > > > > > >>>>>>> users). > > > > > >>>>>>> > > > > > >>>>>>> With this change, we want to introduce a mode that would be > > > > > >>> switching > > > > > >>>>>> back > > > > > >>>>>>> and forth between streaming and "batch in streaming" > > > > automatically. > > > > > >>>> So > > > > > >>>>> a > > > > > >>>>>>> potential performance regression would be much more visible > > and > > > > > >>>> painful > > > > > >>>>>>> at the same time. If batch query runs slower then it could, > > > it's > > > > > >>> kind > > > > > >>>>> of > > > > > >>>>>>> fine as > > > > > >>>>>>> it will end at some point. If streaming query during large > > back > > > > > >>>>> pressure > > > > > >>>>>>> maybe > > > > > >>>>>>> temporary load spike switches to batch processing, that's a > > > > bigger > > > > > >>>>> deal. > > > > > >>>>>>> Especially if batch processing mode will not be able to > > > actually > > > > > >>> even > > > > > >>>>>>> handle > > > > > >>>>>>> the normal load, after the load spike. In that case, the > job > > > > could > > > > > >>>>> never > > > > > >>>>>>> recover > > > > > >>>>>>> from the backpressure/backlog mode. > > > > > >>>>>>> > > > > > >>>>>> I understand you are concerned with the risk of performance > > > > > >>> regression > > > > > >>>>>> introduced due to switching to batch mode. > > > > > >>>>>> > > > > > >>>>>> After thinking about this more, I think this existing > proposal > > > > meets > > > > > >>>> the > > > > > >>>>>> minimum requirement of "not introducing regression for > > existing > > > > > >>> jobs". > > > > > >>>>> The > > > > > >>>>>> reason is that even if batch mode can be slower than stream > > mode > > > > for > > > > > >>>> some > > > > > >>>>>> operators in some cases, this is an optional feature that > will > > > > only > > > > > >>> be > > > > > >>>>>> enabled if a user explicitly overrides the newly introduced > > > config > > > > > to > > > > > >>>>>> non-default values. Existing jobs that simply upgrade their > > > Flink > > > > > >>>> library > > > > > >>>>>> version will not suffer any performance regression. > > > > > >>>>>> > > > > > >>>>>> More specifically, in order to switch to batch mode, users > > will > > > > need > > > > > >>> to > > > > > >>>>>> explicitly set > execution.checkpointing.interval-during-backlog > > > to > > > > 0. > > > > > >>>> And > > > > > >>>>>> users can always explicitly update > > > > > >>>>>> execution.checkpointing.interval-during-backlog to turn off > > the > > > > > batch > > > > > >>>>> mode > > > > > >>>>>> if that incurs any performance issue. > > > > > >>>>>> > > > > > >>>>>> As far as I can tell, for all practical workloads we see in > > > > > >>> production > > > > > >>>>>> jobs, batch mode is always faster (w.r.t. throughput) than > > > stream > > > > > >>> mode > > > > > >>>>> when > > > > > >>>>>> there is a high backlog of incoming records. Though it is > > still > > > > > >>>>>> theoretically possible, it should be very rare (if any) for > > > batch > > > > > >>> mode > > > > > >>>> to > > > > > >>>>>> be slower in practice. Given that it is an optional feature > > that > > > > can > > > > > >>> be > > > > > >>>>>> turned off by users, it might be OK to just let users try it > > out > > > > and > > > > > >>> we > > > > > >>>>> can > > > > > >>>>>> fix performance issues once we detect any of them. What do > you > > > > > think? > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>>>> execution.backlog.use-full-batch-mode-on-start (default > > false) > > > > > >>>>>>> ops sorry, it was supposed to be sth like: > > > > > >>>>>>> > > > > > >>>>>>> execution.backlog.use-batch-mode-only-on-start (default > > false) > > > > > >>>>>>> > > > > > >>>>>>> That option would disallow switching from streaming to > batch. > > > > Batch > > > > > >>>>> mode > > > > > >>>>>>> would be allowed only to get rid of the initial, present on > > > > > >>> start-up > > > > > >>>>>>> backlog. > > > > > >>>>>>> > > > > > >>>>>>> Would allow us to safely experiment with switching from > > > streaming > > > > > >>> to > > > > > >>>>>> batch > > > > > >>>>>>> and I would be actually more fine in enabling "using batch > > mode > > > > on > > > > > >>>>> start" > > > > > >>>>>>> by default, until we gain confidence and feedback that > > > switching > > > > > >>>> back & > > > > > >>>>>>> forth > > > > > >>>>>>> is working as expected. > > > > > >>>>>>> > > > > > >>>>>> Now I understand what you are suggesting. I agree that it is > > > > > >>> necessary > > > > > >>>>> for > > > > > >>>>>> users to be able to disallow switching from streaming to > > batch. > > > > > >>>>>> > > > > > >>>>>> I am not sure it is necessary to introduce an extra config > > just > > > > for > > > > > >>>> this > > > > > >>>>>> purpose. The reason is that we don't have any strategy that > > > > switches > > > > > >>>>>> backlog status from false to true yet. And when we have such > > > > > strategy > > > > > >>>>> (e.g. > > > > > >>>>>> FLIP-328) in the future, it is very likely that we will > > > introduce > > > > > >>> extra > > > > > >>>>>> config(s) for users to explicitly turn on such a feature. > That > > > > means > > > > > >>>> user > > > > > >>>>>> should be able to turn off this feature even if we don't > have > > > > > >>> something > > > > > >>>>>> like execution.backlog.use-batch-mode-only-on-start. > > > > > >>>>>> > > > > > >>>>>> Maybe we can revisit the need for such a config when we > > > > > >>>> introduce/discuss > > > > > >>>>>> the capability to switch backlog from false to true in the > > > future. > > > > > >>> What > > > > > >>>>> do > > > > > >>>>>> you think? > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support > > > > > >>> starting > > > > > >>>>> with > > > > > >>>>>>>>> batch mode and switching only once to > > > > > >>>>>>>>> streaming, and design a follow up with switching back and > > > > forth? > > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this > FLIP > > > into > > > > > >>>> two > > > > > >>>>>>> FLIPs > > > > > >>>>>>>> so that we can make incremental progress. > > > > > >>>>>>> Great, let's do that. In a follow up FLIP we could restart > > the > > > > > >>>>> discussion > > > > > >>>>>>> about > > > > > >>>>>>> switching back and forth. > > > > > >>>>>>> > > > > > >>>>>> Cool, I added the following statement to the motivation > > section. > > > > > >>>>>> > > > > > >>>>>> "NOTE: this FLIP focuses only on the capability to switch > from > > > > batch > > > > > >>> to > > > > > >>>>>> stream mode. If there is any extra API needed to support > > > switching > > > > > >>> from > > > > > >>>>>> stream to batch mode, we will discuss them in a follow-up > > FLIP." > > > > > >>>>>> > > > > > >>>>>> I am looking forward to reading your follow-up thoughts! > > > > > >>>>>> > > > > > >>>>>> Best, > > > > > >>>>>> Dong > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>>>> Piotrek > > > > > >>>>>>> > > > > > >>>>>>> czw., 20 lip 2023 o 16:57 Dong Lin <lindon...@gmail.com> > > > > > >>> napisał(a): > > > > > >>>>>>>> Hi Piotr, > > > > > >>>>>>>> > > > > > >>>>>>>> Thank you for the very detailed comments! Please see my > > reply > > > > > >>>> inline. > > > > > >>>>>>>> On Thu, Jul 20, 2023 at 12:24 AM Piotr Nowojski < > > > > > >>>>>>> piotr.nowoj...@gmail.com> > > > > > >>>>>>>> wrote: > > > > > >>>>>>>> > > > > > >>>>>>>>> Hi Dong, > > > > > >>>>>>>>> > > > > > >>>>>>>>> I have a couple of follow up questions about switching > back > > > and > > > > > >>>>> forth > > > > > >>>>>>>>> between streaming and batching mode. > > > > > >>>>>>>>> Especially around shuffle/watermark strategy, and keyed > > state > > > > > >>>>>> backend. > > > > > >>>>>>>>> First of all, it might not always be beneficial to switch > > > into > > > > > >>>> the > > > > > >>>>>>> batch > > > > > >>>>>>>>> modes: > > > > > >>>>>>>>> - Shuffle strategy > > > > > >>>>>>>>> - Is sorting going to be purely in-memory? If not, > > > > > >>> obviously > > > > > >>>>>>> spilling > > > > > >>>>>>>>> to disks might cause larger overheads > > > > > >>>>>>>>> compared to not sorting the records. > > > > > >>>>>>>>> > > > > > >>>>>>>> Sorting might require spilling data to disk depending on > the > > > > > >>> input > > > > > >>>>>> size. > > > > > >>>>>>>> The behavior of sorting w.r.t. memory/disk is expected to > be > > > > > >>>> exactly > > > > > >>>>>> the > > > > > >>>>>>>> same as the behavior of input sorting automatically > > performed > > > by > > > > > >>>>> Flink > > > > > >>>>>>>> runtime in batch mode for keyed inputs. > > > > > >>>>>>>> > > > > > >>>>>>>> More specifically, ExternalSorter > > > > > >>>>>>>> < > > > > > >>>>>>>> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ExternalSorter.java > > > > > >>>>>>>> is > > > > > >>>>>>>> currently used to sort keyed inputs in batch mode. It is > > > > > >>>>> automatically > > > > > >>>>>>> used > > > > > >>>>>>>> by Flink runtime in OneInputStreamTask (here > > > > > >>>>>>>> < > > > > > >>>>>>>> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java#L114 > > > > > >>>>>>>>> ) > > > > > >>>>>>>> and in MultiInputSortingDataInput (here > > > > > >>>>>>>> < > > > > > >>>>>>>> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/MultiInputSortingDataInput.java#L188 > > > > > >>>>>>>>> ). > > > > > >>>>>>>> We plan to re-use the same code/mechanism to do sorting. > > > > > >>>>>>>> > > > > > >>>>>>>> It is possible that spilling to disks might cause larger > > > > > >>> overhead. > > > > > >>>>> IMO > > > > > >>>>>> it > > > > > >>>>>>>> is an orthogonal issue already existing in Flink. This is > > > > > >>> because a > > > > > >>>>>> Flink > > > > > >>>>>>>> job running batch mode might also be slower than its > > > throughput > > > > > >>> in > > > > > >>>>>> stream > > > > > >>>>>>>> mode due to the same reason. However, even though it is > > > possible > > > > > >>> in > > > > > >>>>>>> theory, > > > > > >>>>>>>> I expect that in practice the throughput of using sorting > + > > > > > >>>>>>>> BatchExecutionKeyedStateBackend should be much higher than > > > using > > > > > >>>>> other > > > > > >>>>>>>> keyed statebackends when the amount of data is large. As a > > > > matter > > > > > >>>> of > > > > > >>>>>>> fact, > > > > > >>>>>>>> we have not heard of complaints of such performance > > regression > > > > > >>>> issues > > > > > >>>>>> in > > > > > >>>>>>>> batch mode. > > > > > >>>>>>>> > > > > > >>>>>>>> The primary goal of this FLIP is to allow the operator to > > run > > > at > > > > > >>>> the > > > > > >>>>>> same > > > > > >>>>>>>> throughput (in stream mode when there is backlog) as it > can > > > > > >>>> currently > > > > > >>>>>> do > > > > > >>>>>>> in > > > > > >>>>>>>> batch mode. And this goal is not affected by the disk > > overhead > > > > > >>>> issue > > > > > >>>>>>>> mentioned above. > > > > > >>>>>>>> > > > > > >>>>>>>> I am thinking maybe we can treat it as an orthogonal > > > performance > > > > > >>>>>>>> optimization problem instead of solving this problem in > this > > > > > >>> FLIP? > > > > > >>>>>>>> - If it will be at least partially in-memory, does > Flink > > > > have > > > > > >>>>> some > > > > > >>>>>>>>> mechanism to reserve optional memory that > > > > > >>>>>>>>> can be revoked if a new operator starts up? Can > this > > > > > >>> memory > > > > > >>>>> be > > > > > >>>>>>>>> redistributed? Ideally we should use as > > > > > >>>>>>>>> much as possible of the available memory to avoid > > > > > >>> spilling > > > > > >>>>>> costs, > > > > > >>>>>>>> but > > > > > >>>>>>>>> also being able to revoke that memory > > > > > >>>>>>>>> > > > > > >>>>>>>> This FLIP does not support dynamically > > revoking/redistribuitng > > > > > >>>>> managed > > > > > >>>>>>>> memory used by the ExternalSorter. > > > > > >>>>>>>> > > > > > >>>>>>>> For operators with isInternalSorterSupported = true, we > will > > > > > >>>> allocate > > > > > >>>>>> to > > > > > >>>>>>>> this operator execution.sorted-inputs.memory > > > > > >>>>>>>> < > > > > > >>>>>>>> > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java#L144 > > > > > >>>>>>>> amount of managed memory. This is the same as how Flink > > > > allocates > > > > > >>>>>> managed > > > > > >>>>>>>> memory to an operator when this operator has keyed inputs > in > > > > > >>> batch > > > > > >>>>>> mode. > > > > > >>>>>>>> Note that this FLIP intends to support operators to sort > > > inputs > > > > > >>>>>> whenever > > > > > >>>>>>>> there is backlog. And there is currently no way for an > > > operator > > > > > >>> to > > > > > >>>>> know > > > > > >>>>>>> in > > > > > >>>>>>>> advance whether there will be no backlog after a given > time. > > > So > > > > > >>> it > > > > > >>>>>> seems > > > > > >>>>>>>> simpler to just keep managed memory for such an operator > > > > > >>> throughout > > > > > >>>>> the > > > > > >>>>>>>> lifecycle of this operator, for now. > > > > > >>>>>>>> > > > > > >>>>>>>> Besides, it seems that the lack of ability to dynamically > > > > > >>>>>>>> revoke/redistribute un-used managed memory is an existing > > > issue > > > > > >>> in > > > > > >>>>>> Flink. > > > > > >>>>>>>> For example, we might have two operators sharing the same > > slot > > > > > >>> and > > > > > >>>>>> these > > > > > >>>>>>>> two operators both use managed memory (e.g. to sort > inputs). > > > > > >>> There > > > > > >>>> is > > > > > >>>>>>>> currently no way for one operator to re-use the memory not > > > used > > > > > >>> by > > > > > >>>>> the > > > > > >>>>>>>> other operator. > > > > > >>>>>>>> > > > > > >>>>>>>> Therefore, I think we can treat this as an orthogonal > > > > performance > > > > > >>>>>>>> optimization problem which can be addressed separately. > What > > > do > > > > > >>> you > > > > > >>>>>>> think? > > > > > >>>>>>>> > > > > > >>>>>>>>> - Sometimes sorting, even if we have memory to do > that, > > > > > >>> might > > > > > >>>>> be > > > > > >>>>>> an > > > > > >>>>>>>>> unnecessary overhead. > > > > > >>>>>>>>> - Watermarks > > > > > >>>>>>>>> - Is holding back watermarks always good? If we have > > tons > > > > > >>> of > > > > > >>>>> data > > > > > >>>>>>>>> buffered/sorted and waiting to be processed > > > > > >>>>>>>>> with multiple windows per key and many different > > keys. > > > > > >>>> When > > > > > >>>>> we > > > > > >>>>>>>>> switch back to `isBacklog=false` we > > > > > >>>>>>>>> first process all of that data before processing > > > > > >>>> watermarks, > > > > > >>>>>> for > > > > > >>>>>>>>> operators that are not using sorted input the > > > > > >>>>>>>>> state size can explode significantly causing lots > of > > > > > >>>>> problems. > > > > > >>>>>>>> Even > > > > > >>>>>>>>> for those that can use sorting, switching to > > > > > >>>>>>>>> sorting or BatchExecutionKeyedStateBackend is not > > > > > >>> always a > > > > > >>>>>> good > > > > > >>>>>>>>> idea, but keeping RocksDB also can be > > > > > >>>>>>>>> risky. > > > > > >>>>>>>>> > > > > > >>>>>>>> With the current FLIP, the proposal is to use a sorter > only > > > when > > > > > >>>> the > > > > > >>>>>>> inputs > > > > > >>>>>>>> have keys. According to this practice, operators which are > > not > > > > > >>>> using > > > > > >>>>>>>> sorting should have un-keyed inputs. I believe such an > > > operator > > > > > >>>> will > > > > > >>>>>> not > > > > > >>>>>>>> even use a keyed state backend. Maybe I missed some > > use-case. > > > > Can > > > > > >>>> you > > > > > >>>>>>>> provide a use-case where we will have an operator with > > > un-keyed > > > > > >>>>> inputs > > > > > >>>>>>>> whose state size can explode due to we holding back > > > watermarks? > > > > > >>>>>>>> > > > > > >>>>>>>> For operators with keyed inputs that use sorting, I > suppose > > it > > > > is > > > > > >>>>>>> possible > > > > > >>>>>>>> that sorting + BatchExecutionKeyedStateBackend can be > worse > > > than > > > > > >>>>> using > > > > > >>>>>>>> RocksDB. But I believe this is very very rare (if > possible) > > in > > > > > >>>> almost > > > > > >>>>>>>> practical usage of Flink. > > > > > >>>>>>>> > > > > > >>>>>>>> Take one step back, if this indeed cause regression for a > > real > > > > > >>>>>> use-case, > > > > > >>>>>>>> user can set > execution.checkpointing.interval-during-backlog > > > to > > > > > >>>>>> anything > > > > > >>>>>>>> other than 0 so that this FLIP will not use > > > > > >>>>>>>> sorter + BatchExecutionKeyedStateBackend even even when > > there > > > is > > > > > >>>>>> backlog. > > > > > >>>>>>>> I would hope we can find a way to automatically determine > > > > whether > > > > > >>>>> using > > > > > >>>>>>>> sorting + BatchExecutionKeyedStateBackend can be better or > > > worse > > > > > >>>> than > > > > > >>>>>>> using > > > > > >>>>>>>> RocksDB alone. But I could not find a good and reliable > way > > to > > > > do > > > > > >>>>> this. > > > > > >>>>>>>> Maybe we can update Flink to do this when we find a good > way > > > to > > > > > >>> do > > > > > >>>>> this > > > > > >>>>>>> in > > > > > >>>>>>>> the future? > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> - Keyed state backend > > > > > >>>>>>>>> - I think you haven't described what happens during > > > > > >>> switching > > > > > >>>>>> from > > > > > >>>>>>>>> streaming to backlog processing. > > > > > >>>>>>>>> > > > > > >>>>>>>> Good point. This indeed needs to be described. I added a > > TODO > > > in > > > > > >>>> the > > > > > >>>>>>>> "Behavior changes ..." section to describe what happens > when > > > > > >>>>> isBacklog > > > > > >>>>>>>> switches from false to true, for all > > > > > >>>>> watermark/checkpoint/statebackend > > > > > >>>>>>> etc. > > > > > >>>>>>>> Let me explain this for the state backend here for now. I > > will > > > > > >>>> update > > > > > >>>>>>> FLIP > > > > > >>>>>>>> later. > > > > > >>>>>>>> > > > > > >>>>>>>> When isBacklog switches from false to true, operator with > > > keyed > > > > > >>>>> inputs > > > > > >>>>>>> can > > > > > >>>>>>>> optionally (as determined by its implementation) starts to > > use > > > > > >>>>> internal > > > > > >>>>>>>> sorter to sort inputs by key, without processing inputs or > > > > > >>> updating > > > > > >>>>>>>> statebackend, until it receives end-of-inputs or isBacklog > > is > > > > > >>>>> switched > > > > > >>>>>> to > > > > > >>>>>>>> false again. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> - Switch can be an unnecessary overhead. > > > > > >>>>>>>> > > > > > >>>>>>>> I agree it can cause unnecessary overhead, particularly > when > > > > > >>>>> isBacklog > > > > > >>>>>>>> switches back and forth frequently. Whether or not this is > > > > > >>>>> unnecessary > > > > > >>>>>>>> likely depends on the duration/throughput of the backlog > > phase > > > > as > > > > > >>>>> well > > > > > >>>>>> as > > > > > >>>>>>>> the specific computation logic of the operator. I am not > > sure > > > > > >>> there > > > > > >>>>> is > > > > > >>>>>> a > > > > > >>>>>>>> good way for Flink to determine in advance whether > switching > > > is > > > > > >>>>>>>> unnecessary. > > > > > >>>>>>>> > > > > > >>>>>>>> Note that for the existing use-case where we expect to > > change > > > > > >>>>> isBacklog > > > > > >>>>>>> to > > > > > >>>>>>>> true (e.g. MySQL CDC snapshot phase, Kafka source > watermark > > > lag > > > > > >>>> being > > > > > >>>>>> too > > > > > >>>>>>>> high), we don't expect the watermark to switch back and > > force > > > > > >>>>>> frequently. > > > > > >>>>>>>> And user can disable this switch by setting > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog to > anything > > > > other > > > > > >>>>> than > > > > > >>>>>> 0. > > > > > >>>>>>>> Therefore, I am wondering if we can also view this as a > > > > > >>> performance > > > > > >>>>>>>> optimization opportunity for extra use-cases in the > future, > > > > > >>> rather > > > > > >>>>>> than a > > > > > >>>>>>>> blocking issue of this FLIP for the MVP use-case (e.g. > > > snapshot > > > > > >>>> phase > > > > > >>>>>> for > > > > > >>>>>>>> any CDC source, Kafka watermark lag). > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> At the same time, in your current proposal, for > > > > > >>>>>>>>> `execution.checkpointing.interval-during-backlog > 0` we > > > won't > > > > > >>>>>>>>> switch to "batch" mode at all. That's a bit of shame, I > > don't > > > > > >>>>>>> understand > > > > > >>>>>>>>> why those two things should be coupled > > > > > >>>>>>>>> together? > > > > > >>>>>>>>> > > > > > >>>>>>>> We can in general classify optimizations as those that are > > > > > >>>> compatible > > > > > >>>>>>> with > > > > > >>>>>>>> checkpointing, and those that are not compatible with > > > > > >>>> checkpointing. > > > > > >>>>>> For > > > > > >>>>>>>> example, input sorting is currently not compatible with > > > > > >>>>> checkpointing. > > > > > >>>>>>> And > > > > > >>>>>>>> buffering input records to reduce state backend overhead > > (and > > > > > >>>>> probably > > > > > >>>>>>>> columnar processing for mini-batch in the future) is > > > compatible > > > > > >>>> with > > > > > >>>>>>>> checkpointing. > > > > > >>>>>>>> > > > > > >>>>>>>> The primary of FLIP-327 is to support optimizations not > > > > > >>> compatible > > > > > >>>>> with > > > > > >>>>>>>> checkpointing. If > > > > > >>> execution.checkpointing.interval-during-backlog > > > > > > >>>>> 0, > > > > > >>>>>>>> which means that user intends to still do checkpointing > even > > > > when > > > > > >>>>> there > > > > > >>>>>>> is > > > > > >>>>>>>> backog, then we will not be able to support such > > > optimizations. > > > > > >>>>>>>> > > > > > >>>>>>>> For optimizations that are compatible with checkpointing, > we > > > can > > > > > >>> do > > > > > >>>>>> this > > > > > >>>>>>>> even when the operator does not run in "batch mode". There > > are > > > > > >>>> extra > > > > > >>>>>>>> problems to solve in order to achieve this optimization, > > such > > > as > > > > > >>>>>>> supporting > > > > > >>>>>>>> unaligned checkpointing without prolonging its sync > phase. I > > > > plan > > > > > >>>> to > > > > > >>>>>>>> explain how this can be done in FLIP-325. > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> All in all, shouldn't we aim for some more clever process > > of > > > > > >>>>>> switching > > > > > >>>>>>>> back > > > > > >>>>>>>>> and forth between streaming/batch modes > > > > > >>>>>>>>> for watermark strategy/state backend/sorting based on > some > > > > > >>>> metrics? > > > > > >>>>>>>> Trying > > > > > >>>>>>>>> to either predict if switching might help, > > > > > >>>>>>>>> or trying to estimate if the last switch was beneficial? > > > Maybe > > > > > >>>>>>> something > > > > > >>>>>>>>> along the lines: > > > > > >>>>>>>>> - sort only in memory and during sorting count the number > > of > > > > > >>>>> distinct > > > > > >>>>>>>> keys > > > > > >>>>>>>>> (NDK) > > > > > >>>>>>>>> - maybe allow for spilling if so far in memory we > have > > > NDK > > > > > >>> * > > > > > >>>> 5 > > > > > >>>>>> = > > > > > >>>>>>>>> #records > > > > > >>>>>>>>> - do not allow to buffer records above a certain > threshold, > > > as > > > > > >>>>>>> otherwise > > > > > >>>>>>>>> checkpointing can explode > > > > > >>>>>>>>> - switch to `BatchExecutionKeyedStateBackend` only if NDK > > * 2 > > > > > >>>> = > > > > > >>>>>>> #records > > > > > >>>>>>>>> - do not sort if last NDKs (or EMA of NDK?) 1.5 <= > #records > > > > > >>>>>>>>> > > > > > >>>>>>>>> Or even maybe for starters something even simpler and > then > > > test > > > > > >>>> out > > > > > >>>>>>>>> something more fancy as a follow up? > > > > > >>>>>>>>> > > > > > >>>>>>>> I agree it is worth investigating these ideas to further > > > > optimize > > > > > >>>> the > > > > > >>>>>>>> performance during backlog. > > > > > >>>>>>>> > > > > > >>>>>>>> I just think these can be done independently after this > > FLIP. > > > > The > > > > > >>>>> focus > > > > > >>>>>>> of > > > > > >>>>>>>> this FLIP is to re-use in stream mode the same > optimization > > > > which > > > > > >>>> we > > > > > >>>>>>>> already use in batch mode, rather than inventing or > > improving > > > > the > > > > > >>>>>>>> performance of these existing optimizations. > > > > > >>>>>>>> > > > > > >>>>>>>> Given that there are already a lot of new > mechanism/features > > > to > > > > > >>>>> discuss > > > > > >>>>>>> and > > > > > >>>>>>>> address in this FLIP, I am hoping we can limit the scope > of > > > this > > > > > >>>> FLIP > > > > > >>>>>> to > > > > > >>>>>>>> re-use the existing optimization, and do these extra > > > > optimization > > > > > >>>>>>>> opportunities as future work. > > > > > >>>>>>>> > > > > > >>>>>>>> What do you think? > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> At the same time, > > > > > >>>>> `execution.checkpointing.interval-during-backlog=0` > > > > > >>>>>>>> seems > > > > > >>>>>>>>> a weird setting to me, that I would > > > > > >>>>>>>>> not feel safe recommending to anyone. If processing of a > > > > > >>> backlog > > > > > >>>>>> takes > > > > > >>>>>>> a > > > > > >>>>>>>>> long time, a job might stop making > > > > > >>>>>>>>> any progress due to some random failures. Especially > > > dangerous > > > > > >>>> if a > > > > > >>>>>> job > > > > > >>>>>>>> switches from streaming mode back to > > > > > >>>>>>>>> backlog processing due to some reasons, as that could > > happen > > > > > >>>> months > > > > > >>>>>>> after > > > > > >>>>>>>>> someone started a job with this > > > > > >>>>>>>>> strange setting. So should we even have it? I would > simply > > > > > >>>> disallow > > > > > >>>>>>> it. I > > > > > >>>>>>>> Good point. I do agree we need to further work to improve > > the > > > > > >>>>> failover > > > > > >>>>>>>> performance in case any task fails. > > > > > >>>>>>>> > > > > > >>>>>>>> As of the current FLIP, if any task fails during backlog > and > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog = 0, we > will > > > > need > > > > > >>>> to > > > > > >>>>>>>> restart all operators to the last checkpointed state and > > > > continue > > > > > >>>>>>>> processing backlog. And this can be a lot of rollback > since > > > > there > > > > > >>>> is > > > > > >>>>> no > > > > > >>>>>>>> checkpoint during backlog. And this can also be worse than > > > batch > > > > > >>>>> since > > > > > >>>>>>> this > > > > > >>>>>>>> FLIP currently does not support exporting/saving records > to > > > > local > > > > > >>>>> disk > > > > > >>>>>>> (or > > > > > >>>>>>>> shuffle service) so that a failed task can re-consume the > > > > records > > > > > >>>>> from > > > > > >>>>>>> the > > > > > >>>>>>>> upstream task (or shuffle service) in the same way as how > > > Flink > > > > > >>>>>> failover > > > > > >>>>>>> a > > > > > >>>>>>>> task in batch mode. > > > > > >>>>>>>> > > > > > >>>>>>>> I think we can extend this FLIP to solve this problem so > > that > > > it > > > > > >>>> can > > > > > >>>>>> have > > > > > >>>>>>>> at least the same behavior/performance as batch-mode job. > > The > > > > > >>> idea > > > > > >>>> is > > > > > >>>>>> to > > > > > >>>>>>>> also follow what batch mode does. For example, we can > > trigger > > > a > > > > > >>>>>>> checkpoint > > > > > >>>>>>>> when isBacklog switches to true, and every operator should > > > > buffer > > > > > >>>> its > > > > > >>>>>>>> output in the TM local disk (or remote shuffle service). > > > > > >>> Therefore, > > > > > >>>>>>> after a > > > > > >>>>>>>> task fails, it can restart from the last checkpoint and > > > > > >>> re-consume > > > > > >>>>> data > > > > > >>>>>>>> buffered in the upstream task. > > > > > >>>>>>>> > > > > > >>>>>>>> I will update FLIP as described above. Would this address > > your > > > > > >>>>> concern? > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> could see a power setting like: > > > > > >>>>>>>>> `execution.backlog.use-full-batch-mode-on-start > > > > > >>> (default > > > > > >>>>>>> false)` > > > > > >>>>>>>> I am not sure I fully understand this config or its > > > motivation. > > > > > >>> Can > > > > > >>>>> you > > > > > >>>>>>>> help explain the exact semantics of this config? > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> that would override any heuristic of switching to backlog > > if > > > > > >>>>> someone > > > > > >>>>>> is > > > > > >>>>>>>>> submitting a new job that starts with > > > > > >>>>>>>>> `isBacklog=true`. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Or we could limit the scope of this FLIP to only support > > > > > >>> starting > > > > > >>>>>> with > > > > > >>>>>>>>> batch mode and switching only once to > > > > > >>>>>>>>> streaming, and design a follow up with switching back and > > > > > >>> forth? > > > > > >>>>>>>> Sure, that sounds good to me. I am happy to split this > FLIP > > > into > > > > > >>>> two > > > > > >>>>>>> FLIPs > > > > > >>>>>>>> so that we can make incremental progress. > > > > > >>>>>>>> > > > > > >>>>>>>> Best, > > > > > >>>>>>>> Dong > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>>> I'm looking forwards to hearing/reading out your > thoughts. > > > > > >>>>>>>>> > > > > > >>>>>>>>> Best, > > > > > >>>>>>>>> Piotrek > > > > > >>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>>> śr., 12 lip 2023 o 12:38 Jing Ge > > <j...@ververica.com.invalid > > > > > > > > > >>>>>>>> napisał(a): > > > > > >>>>>>>>>> Hi Dong, > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Thanks for your reply! > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> Best regards, > > > > > >>>>>>>>>> Jing > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> On Wed, Jul 12, 2023 at 3:25 AM Dong Lin < > > > > > >>> lindon...@gmail.com> > > > > > >>>>>>> wrote: > > > > > >>>>>>>>>>> Hi Jing, > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Thanks for the comments. Please see my reply inline. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> On Wed, Jul 12, 2023 at 5:04 AM Jing Ge > > > > > >>>>>> <j...@ververica.com.invalid > > > > > >>>>>>>>>>> wrote: > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> Hi Dong, > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> Thanks for the clarification. Now it is clear for me. > I > > > > > >>> got > > > > > >>>>>>>>> additional > > > > > >>>>>>>>>>> noob > > > > > >>>>>>>>>>>> questions wrt the internal sorter. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> 1. when to call setter to set the > > internalSorterSupported > > > > > >>>> to > > > > > >>>>> be > > > > > >>>>>>>> true? > > > > > >>>>>>>>>>> Developer of the operator class (i.e. those classes > which > > > > > >>>>>>> implements > > > > > >>>>>>>>>>> `StreamOperator`) should override the > > > > > >>>>> `#getOperatorAttributes()` > > > > > >>>>>>> API > > > > > >>>>>>>> to > > > > > >>>>>>>>>> set > > > > > >>>>>>>>>>> internalSorterSupported to true, if he/she decides to > > sort > > > > > >>>>>> records > > > > > >>>>>>>>>>> internally in the operator. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> 2 > > > > > >>>>>>>>>>>> *"For those operators whose throughput can be > > > > > >>> considerably > > > > > >>>>>>> improved > > > > > >>>>>>>>>> with > > > > > >>>>>>>>>>> an > > > > > >>>>>>>>>>>> internal sorter, update it to take advantage of the > > > > > >>>> internal > > > > > >>>>>>> sorter > > > > > >>>>>>>>>> when > > > > > >>>>>>>>>>>> its input has isBacklog=true.* > > > > > >>>>>>>>>>>> *Typically, operators that involve aggregation > operation > > > > > >>>>> (e.g. > > > > > >>>>>>>> join, > > > > > >>>>>>>>>>>> cogroup, aggregate) on keyed inputs can benefit from > > > > > >>> using > > > > > >>>> an > > > > > >>>>>>>>> internal > > > > > >>>>>>>>>>>> sorter."* > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> *"The operator that performs CoGroup operation will > > > > > >>>>> instantiate > > > > > >>>>>>> two > > > > > >>>>>>>>>>>> internal sorter to sorts records from its two inputs > > > > > >>>>>> separately. > > > > > >>>>>>>> Then > > > > > >>>>>>>>>> it > > > > > >>>>>>>>>>>> can pull the sorted records from these two sorters. > This > > > > > >>>> can > > > > > >>>>> be > > > > > >>>>>>>> done > > > > > >>>>>>>>>>>> without wrapping input records with TaggedUnion<...>. > In > > > > > >>>>>>>> comparison, > > > > > >>>>>>>>>> the > > > > > >>>>>>>>>>>> existing DataStream#coGroup needs to wrap input > records > > > > > >>>> with > > > > > >>>>>>>>>>>> TaggedUnion<...> before sorting them using one > external > > > > > >>>>> sorter, > > > > > >>>>>>>> which > > > > > >>>>>>>>>>>> introduces higher overhead."* > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> According to the performance test, it seems that > > internal > > > > > >>>>>> sorter > > > > > >>>>>>>> has > > > > > >>>>>>>>>>> better > > > > > >>>>>>>>>>>> performance than external sorter. Is it possible to > make > > > > > >>>>> those > > > > > >>>>>>>>>> operators > > > > > >>>>>>>>>>>> that can benefit from it use internal sorter by > default? > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> Yes, it is possible. After this FLIP is done, users can > > use > > > > > >>>>>>>>>>> DataStream#coGroup with EndOfStreamWindows as the > window > > > > > >>>>> assigner > > > > > >>>>>>> to > > > > > >>>>>>>>>>> co-group two streams in effectively the batch manner. > An > > > > > >>>>> operator > > > > > >>>>>>>> that > > > > > >>>>>>>>>> uses > > > > > >>>>>>>>>>> an internal sorter will be used to perform the co-group > > > > > >>>>>> operation. > > > > > >>>>>>>>> There > > > > > >>>>>>>>>> is > > > > > >>>>>>>>>>> no need for users of the DataStream API to explicitly > > know > > > > > >>> or > > > > > >>>>> set > > > > > >>>>>>> the > > > > > >>>>>>>>>>> internal sorter in anyway. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> In the future, we plan to incrementally optimize other > > > > > >>>>>> aggregation > > > > > >>>>>>>>>>> operation (e.g. aggregate) on the DataStream API when > > > > > >>>>>>>>> EndOfStreamWindows > > > > > >>>>>>>>>> is > > > > > >>>>>>>>>>> used as the window assigner. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Best, > > > > > >>>>>>>>>>> Dong > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> Best regards, > > > > > >>>>>>>>>>>> Jing > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> On Tue, Jul 11, 2023 at 2:58 PM Dong Lin < > > > > > >>>>> lindon...@gmail.com> > > > > > >>>>>>>>> wrote: > > > > > >>>>>>>>>>>>> Hi Jing, > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Thank you for the comments! Please see my reply > inline. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> On Tue, Jul 11, 2023 at 5:41 AM Jing Ge > > > > > >>>>>>>> <j...@ververica.com.invalid > > > > > >>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Hi Dong, > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thanks for the proposal! The FLIP is already in good > > > > > >>>>>> shape. I > > > > > >>>>>>>> got > > > > > >>>>>>>>>>> some > > > > > >>>>>>>>>>>>> NIT > > > > > >>>>>>>>>>>>>> questions. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> 1. It is a little bit weird to write the hint right > > > > > >>>> after > > > > > >>>>>> the > > > > > >>>>>>>>>>>> motivation > > > > > >>>>>>>>>>>>>> that some features have been moved to FLIP-331, > > > > > >>> because > > > > > >>>>> at > > > > > >>>>>>> that > > > > > >>>>>>>>>> time, > > > > > >>>>>>>>>>>>>> readers don't know the context about what features > > > > > >>> does > > > > > >>>>> it > > > > > >>>>>>>> mean. > > > > > >>>>>>>>> I > > > > > >>>>>>>>>>>> would > > > > > >>>>>>>>>>>>>> suggest moving the note to the beginning of "Public > > > > > >>>>>>> interfaces" > > > > > >>>>>>>>>>>> sections. > > > > > >>>>>>>>>>>>> Given that the reviewer who commented on this email > > > > > >>>> thread > > > > > >>>>>>>> before I > > > > > >>>>>>>>>>>>> refactored the FLIP (i.e. Piotr) has read FLP-331, I > > > > > >>>> think > > > > > >>>>> it > > > > > >>>>>>> is > > > > > >>>>>>>>>>> simpler > > > > > >>>>>>>>>>>> to > > > > > >>>>>>>>>>>>> just remove any mention of FLIP-331. I have updated > the > > > > > >>>>> FLIP > > > > > >>>>>>>>>>> accordingly. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> 2. It is also a little bit weird to describe all > > > > > >>>>> behaviour > > > > > >>>>>>>>> changes > > > > > >>>>>>>>>> at > > > > > >>>>>>>>>>>>> first > > > > > >>>>>>>>>>>>>> but only focus on one single feature, i.e. how to > > > > > >>>>> implement > > > > > >>>>>>>>>>>>>> internalSorterSupported. TBH, I was lost while I was > > > > > >>>>>> reading > > > > > >>>>>>>> the > > > > > >>>>>>>>>>> Public > > > > > >>>>>>>>>>>>>> interfaces. Maybe change the FLIP title? Another > > > > > >>> option > > > > > >>>>>> could > > > > > >>>>>>>> be > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>>>> write a > > > > > >>>>>>>>>>>>>> short summary of all features and point out that > this > > > > > >>>>> FLIP > > > > > >>>>>>> will > > > > > >>>>>>>>>> only > > > > > >>>>>>>>>>>>> focus > > > > > >>>>>>>>>>>>>> on the internalSorterSupported feature. Others could > > > > > >>> be > > > > > >>>>>> found > > > > > >>>>>>>> in > > > > > >>>>>>>>>>>>> FLIP-331. > > > > > >>>>>>>>>>>>>> WDYT? > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Conceptually, the purpose of this FLIP is to allow a > > > > > >>>> stream > > > > > >>>>>>> mode > > > > > >>>>>>>>> job > > > > > >>>>>>>>>> to > > > > > >>>>>>>>>>>> run > > > > > >>>>>>>>>>>>> parts of the topology in batch mode so that it can > > > > > >>> apply > > > > > >>>>>>>>>>>>> optimizations/computations that can not be used > > > > > >>> together > > > > > >>>>> with > > > > > >>>>>>>>>>>> checkpointing > > > > > >>>>>>>>>>>>> (and thus not usable in stream mode). Although > internal > > > > > >>>>>> sorter > > > > > >>>>>>> is > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>>> only > > > > > >>>>>>>>>>>>> optimization immediately supported in this FLIP, this > > > > > >>>> FLIP > > > > > >>>>>> lays > > > > > >>>>>>>> the > > > > > >>>>>>>>>>>>> foundation to support other optimizations in the > > > > > >>> future, > > > > > >>>>> such > > > > > >>>>>>> as > > > > > >>>>>>>>>> using > > > > > >>>>>>>>>>>> GPU > > > > > >>>>>>>>>>>>> to process a bounded stream of records. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Therefore, I find it better to keep the current title > > > > > >>>>> rather > > > > > >>>>>>> than > > > > > >>>>>>>>>>>> limiting > > > > > >>>>>>>>>>>>> the scope to internal sorter. What do you think? > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> 3. There should be a typo at 4) Checkpoint and > > > > > >>> failover > > > > > >>>>>>>> strategy > > > > > >>>>>>>>> -> > > > > > >>>>>>>>>>>> Mixed > > > > > >>>>>>>>>>>>>> mode -> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> - If any task fails when isBacklog=false true, > > > > > >>> this > > > > > >>>>> task > > > > > >>>>>>> is > > > > > >>>>>>>>>>>> restarted > > > > > >>>>>>>>>>>>> to > > > > > >>>>>>>>>>>>>> re-process its input from the beginning. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Thank you for catching this issue. It is fixed now. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> Best, > > > > > >>>>>>>>>>>>> Dong > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Best regards > > > > > >>>>>>>>>>>>>> Jing > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Thu, Jul 6, 2023 at 1:24 PM Dong Lin < > > > > > >>>>>> lindon...@gmail.com > > > > > >>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>> Hi Piotr, > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks for your comments! Please see my reply > > > > > >>> inline. > > > > > >>>>>>>>>>>>>>> On Wed, Jul 5, 2023 at 11:44 PM Piotr Nowojski < > > > > > >>>>>>>>>>>>> piotr.nowoj...@gmail.com > > > > > >>>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Hi Dong, > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> I have a couple of questions. > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> Could you explain why those properties > > > > > >>>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean isOutputOnEOF = > > > > > >>> null; > > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean > > > > > >>> isOutputOnCheckpoint > > > > > >>>> = > > > > > >>>>>>> null; > > > > > >>>>>>>>>>>>>>>> @Nullable private Boolean > > > > > >>>>>> isInternalSorterSupported = > > > > > >>>>>>>>> null; > > > > > >>>>>>>>>>>>>>>> must be `@Nullable`, instead of having the > > > > > >>> default > > > > > >>>>>> value > > > > > >>>>>>>> set > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>>>>> `false`? > > > > > >>>>>>>>>>>>>>> By initializing these private variables in > > > > > >>>>>>>>>>> OperatorAttributesBuilder > > > > > >>>>>>>>>>>> as > > > > > >>>>>>>>>>>>>>> null, we can implement > > > > > >>>>>> `OperatorAttributesBuilder#build()` > > > > > >>>>>>> in > > > > > >>>>>>>>>> such > > > > > >>>>>>>>>>> a > > > > > >>>>>>>>>>>>> way > > > > > >>>>>>>>>>>>>>> that it can print DEBUG level logging to say > > > > > >>>>>>>>>> "isOutputOnCheckpoint > > > > > >>>>>>>>>>> is > > > > > >>>>>>>>>>>>> not > > > > > >>>>>>>>>>>>>>> explicitly set". This can help user/SRE debug > > > > > >>>>> performance > > > > > >>>>>>>>> issues > > > > > >>>>>>>>>>> (or > > > > > >>>>>>>>>>>>> lack > > > > > >>>>>>>>>>>>>>> of the expected optimization) due to operators not > > > > > >>>>>>> explicitly > > > > > >>>>>>>>>>> setting > > > > > >>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>> right operator attribute. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> For example, we might want a job to always use the > > > > > >>>>> longer > > > > > >>>>>>>>>>>> checkpointing > > > > > >>>>>>>>>>>>>>> interval (i.e. > > > > > >>>>>>>> execution.checkpointing.interval-during-backlog) > > > > > >>>>>>>>>> if > > > > > >>>>>>>>>>>> all > > > > > >>>>>>>>>>>>>>> running operators have isOutputOnCheckpoint==false, > > > > > >>>> and > > > > > >>>>>> use > > > > > >>>>>>>> the > > > > > >>>>>>>>>>> short > > > > > >>>>>>>>>>>>>>> checkpointing interval otherwise. If a user has > > > > > >>>>>> explicitly > > > > > >>>>>>>>>>> configured > > > > > >>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>> execution.checkpointing.interval-during-backlog but > > > > > >>>> the > > > > > >>>>>>>>> two-phase > > > > > >>>>>>>>>>>>> commit > > > > > >>>>>>>>>>>>>>> sink library has not been upgraded to set > > > > > >>>>>>>>>>> isOutputOnCheckpoint=true, > > > > > >>>>>>>>>>>>> then > > > > > >>>>>>>>>>>>>>> the job will end up using the long checkpointing > > > > > >>>>>> interval, > > > > > >>>>>>>> and > > > > > >>>>>>>>> it > > > > > >>>>>>>>>>>> will > > > > > >>>>>>>>>>>>> be > > > > > >>>>>>>>>>>>>>> useful to figure out what is going wrong in this > > > > > >>> case > > > > > >>>>> by > > > > > >>>>>>>>> checking > > > > > >>>>>>>>>>> the > > > > > >>>>>>>>>>>>>> log. > > > > > >>>>>>>>>>>>>>> Note that the default value of these fields of the > > > > > >>>>>>>>>>> OperatorAttributes > > > > > >>>>>>>>>>>>>>> instance built by OperatorAttributesBuilder will > > > > > >>>> still > > > > > >>>>> be > > > > > >>>>>>>>> false. > > > > > >>>>>>>>>>> The > > > > > >>>>>>>>>>>>>>> following is mentioned in the Java doc of > > > > > >>>>>>>>>>>>>>> `OperatorAttributesBuilder#build()`: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> /** > > > > > >>>>>>>>>>>>>>> * If any operator attribute is null, we will log > > > > > >>> it > > > > > >>>>> at > > > > > >>>>>>>> DEBUG > > > > > >>>>>>>>>>> level > > > > > >>>>>> > > >