How do you feel about proceeding without an additional vote? I agree, we added quite a bit in there, but none of the added content changes what has been voted so far. A big part of the added content can be considered as clarifying the implementation. I'd be inclined to proceed without a vote. WDYT?
Best, Dawid On 22/07/2021 14:55, Yun Gao wrote: > Hi Till, > > Very thanks for the comments and tips! We will update the FLIP with the new > details and we > should need a new vote since the FLIP is updated a large part, I think we > would start the > vote right after we also solve the remaining UnionListState problem and have > a double > check on that we have no other missing points for the design and FLIP~ > > Best, > Yun > > > ------------------------------------------------------------------ > From:Till Rohrmann <trohrm...@apache.org> > Send Time:2021 Jul. 22 (Thu.) 17:57 > To:Yun Gao <yungao...@aliyun.com> > Cc:Piotr Nowojski <pnowoj...@apache.org>; dev <dev@flink.apache.org>; Yun Gao > <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished > > Thanks everyone for this discussion. I think this is very helpful. > > I do agree with Piotr's proposal to separate state and the lifecycle of a > StreamOperator. That way the finished state can be used to recover a > StreamOperator or to rescale the topology. I also believe that this will make > the implementation a lot easier since we don't have to remember which > StreamOperator has actually finished. Moreover, it solves the problem of > rescaling. > > I also agree with the proposed solution for checkpoints after finish() has > been called. I guess the first checkpoint that was triggered after finish() > and sends notifyCheckpointComplete should be good enough to close the > StreamTask. > > Shall we update the FLIP with these details? Do we need another vote for it > or shall we continue w/o it? > > Cheers, > Till > On Thu, Jul 22, 2021 at 10:45 AM Yun Gao <yungao...@aliyun.com> wrote: > > Hi Piotr, > > Very thanks for the explanation! and very sorry that initially I should > wrongly understand the problem Dawid proposed. > >> And what should we do now? We can of course commit all transactions until >> checkpoint 43. >> But should we keep waiting for `notyifyCheckpointComplete(44)`? >> hat if in the meantime another checkpoint is triggered? We could end up >> waiting indefinitely. >> Our proposal is to shutdown the task immediately after seeing first >> `notifyCheckpointComplete(X)`, >> where X is any triggered checkpoint AFTER `finish()`. This should be fine, >> as: >> a) ideally there should be no new pending transactions opened after >> checkpoint 42 >> b) even if operator/function is opening some transactions for checkpoint 43 >> and >> checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 >> should be empty > Now I understand more on this issue: currently operators like the > TwoPhaseCommitSinkFunctions would create > a new transaction when snapshotting state, thus for each checkpoint there > would be one corresponding > transaction, thus after finish() and before notifyCheckpointComplete(), there > might be some empty > transactions. > > I also agree with the proposal: we could finish the task after we received > the first checkpoint complete > notification, and the checkpoint with the larger checkpoint id should be > allowed to be finished. The > checkpoint complete notification should be able to commit all the non-empty > transactions. A part of > the empty transactions would also get committed, but it should cause no harm. > The other empty > transactions would be aborted in close(), which should also cause no harm. > >> If checkpoint 44 completes afterwards, it will still be valid. Ideally we >> would recommend that after > seeing `finish()` operators/functions should not be opening any new > transactions, but that shouldn't be required. > > And from another of view, I think perhaps it might also be acceptable that we > have some requirements for > the pattern of the operators, like the sink would be able to skip creating > new transactions after finish() is called. > Perhaps we may treat the process as a protocol between the framework and the > operators, and the operators > might need to follow the protocol. Of course it would be better that the > framework could handle all the cases > elegantly, and put less implicit limitation to the operators, and at least we > should guarantee not cause compatibility > problem after changing the process. > > Very thanks for the careful checks on the whole process. > > Best, > Yun > > > ------------------------------------------------------------------ > From:Piotr Nowojski <pnowoj...@apache.org> > Send Time:2021 Jul. 22 (Thu.) 15:33 > To:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com> > Cc:Till Rohrmann <trohrm...@apache.org>; Yun Gao > <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished > > Hi Guowei, > >> Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish can >> becalled repeatedly. > Just to clarify. It's not about calling `finish()` and `endInput()` > repeatedly, but about (from the perspective of operator's state) > 1. seeing `finish()` > 2. checkpoint X triggered and completed > 3. failure + recovery from X > 4. potentially processing more records > 5. another `finish()` > > But from the context of the remaining part of your message Guowei I presume > that you have already got that point :) > > Yun: > >> For this issue perhaps we could explicitly requires the task to wait for a >> checkpoint triggered after finish()> method is called for all the operators >> ? We could be able to achieve this target by maintaining >> some state inside the task. > Isn't this exactly the "WAITING_FOR_FINAL_CP" from the FLIP document? That we > always need to wait for a checkpoint triggered after `finish()` to complete, > before shutting down a task? > > What Dawid was describing is a scenario where: > 1. task/operator received `finish()` > 2. checkpoint 42 triggered (not yet completed) > 3. checkpoint 43 triggered (not yet completed) > 4. checkpoint 44 triggered (not yet completed) > 5. notifyCheckpointComplete(43) > > And what should we do now? We can of course commit all transactions until > checkpoint 43. But should we keep waiting for > `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint > is triggered? We could end up waiting indefinitely. > > Our proposal is to shutdown the task immediately after seeing first > `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER > `finish()`. This should be fine, as: > a) ideally there should be no new pending transactions opened after > checkpoint 42 > b) even if operator/function is opening some transactions for checkpoint 43 > and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint > 42 should be empty > > Hence comment from Dawid >> We must make sure those checkpoints do not leave lingering external >> resources. > After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to: > - commit transactions from checkpoint 42, (and 43 if they were created, > depends on the user code) > - close operator, aborting any pending transactions (for checkpoint 44 if > they were opened, depends on the user code) > > If checkpoint 44 completes afterwards, it will still be valid. Ideally we > would recommend that after seeing `finish()` operators/functions should not > be opening any new transactions, but that shouldn't be required. > > Best, > Piotrek > czw., 22 lip 2021 o 09:00 Yun Gao <yungao...@aliyun.com.invalid> napisał(a): > Hi Dawid, Piotr, Steven, > > Very thanks for pointing out these issues and very thanks for the discussion > ! > > Failure before notifyCheckpointComplete() > > For this issue I would agree with what Piotr has proposed. I tried to use > some > operators like sink / window as example and currently I also do not found > explicit scenarios that might cause problems if records are processed by a > task that is assigned with states snapshotted after calling finish() before. > For the future cases it seems users should be able to implement their > target logic by explicitly add a flag regarding finished, and perhaps have > different logic if this part of states are referred to. Besides, this case > would > only happen on rescaling or topology change, which embedded some kind of > user > knowledge inside the action. Thus it looks acceptable that we still split > the operators > state from the task lifecycle, and do not treat checkpoint after finish() > differently. > > Finishing upon receiving notifyCheckpointComplete() of not the latest > checkpoint > > For this issue perhaps we could explicitly requires the task to wait for a > checkpoint triggered after finish() > method is called for all the operators ? We could be able to achieve this > target by maintaining > some state inside the task. > > Checkpointing from a single subtask / UnionListState case > > This should indeed cause problems, and I also agree with that we could focus > on this thread in the > https://issues.apache.org/jira/browse/FLINK-21080. > > Best, > Yun > > > ------------------------------------------------------------------ > From:Piotr Nowojski <pnowoj...@apache.org> > Send Time:2021 Jul. 22 (Thu.) 02:46 > To:dev <dev@flink.apache.org> > Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>; Yun > Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished > > Hi Steven, > > > I probably missed sth here. isn't this the case today already? Why is it a > concern for the proposed change? > > The problem is with the newly added `finish()` method and the already > existing `endInput()` call. Currently on master there are no issues, because > we are not checkpointing any operators after some operators have finished. > The purpose of this FLIP-147 is to exactly enable this and this opens a new > problem described by Dawid. > > To paraphrase and to give a concrete example. Assume we have an operator > with parallelism of two. Subtask 0 and subtask 1. > > 1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1 hasn't > (yet). > 2. Checkpoint 42 is triggered, and it completes. > 3. Job fails and is restarted, but at the same time it's rescaled. User has > chosen to scale down his operator down to 1. > > Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has > been processed or not, so while recovering to checkpoint 42, we have to > recover both finished subtask 0 (#1) state and not yet finished subtask 1's > (#1). But at the same time they are scaled down, so we only have a single > subtask 0 (#2) that has a combined state from both of the previous instances. > The potentially confusing issue is that the state from subtask 0 (#1) was > checkpointed AFTER `endInput()` and `finish()` calls, but it's recovered to > an operator that has still some records to process. In step 1. an user for > example could store on the operator's state a bit of information "end input > has been already called!", that after recovery would no longer be true. > > Hence the question about `finish()` and `endInput()` semantics. Should it be > tied down to state, or just to an operator instance/execution attempt? > > Piotrek > śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a): > > if a failure happens after sequence of finish() -> snapshotState(), but > before notifyCheckpointComplete(), we will restore such a state and we > might end up sending some more records to such an operator. > > I probably missed sth here. isn't this the case today already? Why is it a > concern for the proposed change? > > On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > > > Hi Dawid, > > > > Thanks for writing down those concerns. > > > > I think the first issue boils down what should be the contract of > lifecycle > > methods like open(), close(), initializeState() etc and especially the new > > additions like finish() and endInput(). And what should be their relation > > with the operator state (regardless of it's type keyed, non-keyed, union, > > ...). Should those methods be tied to state or not? After thinking about > it > > for a while (and discussing it offline with Dawid), I think the answer > > might be no, they shouldn't. I mean maybe we should just openly say that > > all of those methods relate to this single particular instance and > > execution of the operator. And if a job is recovered/rescaled, we would be > > allowed to freely resume consumption, ignoring a fact that maybe some > parts > > of the state have previously seen `endInput()`. Why? > > > > 0. Yes, it might be confusing. Especially with `endInput()`. We call > > `endInput()`, we store something in a state and later after recovery > > combined with rescaling that state can see more records? Indeed weird, > > 1. I haven't come up yet with a counterexample that would break and make > > impossible to implement a real life use case. Theoretically yes, the user > > can store `endInput()` on state, and after rescaling this state would be > > inconsistent with what is actually happening with the operator, but I > > haven't found a use case that would break because of that. > > 2. Otherwise, implementation would be very difficult. > > 3. It's difficult to access keyed state from within > `endInput()`/`finish()` > > calls, as they do not have key context. > > 4. After all, openly defining `endInput()` and `finish()` to be tied with > > it's operator execution instance lifecycle is not that strange and quite > > simple to explain. Sure, it can lead to a bit of confusion (0.), but that > > doesn't sound that bad in comparison with the alternatives that I'm aware > > of. Also currently methods like `open()` and `close()` are also tied to > the > > operator execution instance, not to the state. Operators can be opened and > > closed multiple times, it doesn't mean that the state is lost after > closing > > an operator. > > > > For the UnionListState problem I have posted my proposal in the ticket > [1], > > so maybe let's move that particular discussion there? > > > > Piotrek > > > > [1] https://issues.apache.org/jira/browse/FLINK-21080 > > > > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org> > > napisał(a): > > > > > Hey all, > > > > > > To make the issues that were found transparent to the community, I want > > to > > > post an update: > > > > > > *1. Committing side-effects* > > > We do want to make sure that all side effects are committed before > > > bringing tasks down. Side effects are committed when calling > > > notifyCheckpointComplete. For the final checkpoint we introduced the > > method > > > finish(). This notifies the operator that we have consumed all incoming > > > records and we are preparing to close the Task. In turn we should flush > > any > > > pending buffered records and prepare to commit last transactions. The > > goal > > > is that after a successful sequence of finish() -> snapshotState() -> > > > notifyCheckpointComplete(), the remaining state can be considered > > > empty/finished and may be discarded. > > > > > > *Failure before notifyCheckpointComplete()* > > > > > > The question is what is the contract of the endInput()/finish() methods > > > and how do calling these methods affect the operators keyed, non-keyed > > > state and external state. Is it allowed to restore state snapshot taken > > > after calling endInput()/finish() and process more records? Or do we > > assume > > > that after a restore from such a state taken after finish() we should > not > > > call any of the lifecycle methods or at least make sure those methods do > > > not emit records/interact with mailbox etc. > > > > > > Currently it is possible that if a failure happens after sequence of > > > finish() -> snapshotState(), but before notifyCheckpointComplete(), we > > will > > > restore such a state and we might end up sending some more records to > > such > > > an operator. It is possible if we rescale and this state is merged with > a > > > state of a subtask that has not called finish() yet. It can also happen > > if > > > we rescale the upstream operator and the subtask of interest becomes > > > connected to a newly added non finished subtask. > > > > > > *Snapshotting StreamTasks that finish() has been called* > > > > > > > > > We thought about putting a flag into the snapshot of a subtask produced > > > after the finish() method. This would make it possible to skip execution > > of > > > certain lifecycle methods. Unfortunately this creates problems for > > > rescaling. How do we deal with a situation that subtask states with both > > > the feature flag set and unset end up in a single StreamTask. Additional > > > problem is that we merge those states into a single OperatorSubtaskState > > on > > > CheckpointCoordinator. > > > > > > *Finishing upon receiving notifyCheckpointComplete() of not the latest > > > checkpoint* > > > > > > We need to wait for a checkpoint to complete, that started after the > > > finish() method. However, we support concurrent checkpoints therefore, > > > there might be later checkpoints that completed, but the notification > has > > > not arrived. We must make sure those checkpoints do not leave lingering > > > external resources. > > > > > > *Checkpointing from a single subtask / UnionListState case* > > > There are operators that checkpoint from a single subtask only. Usually > > > from the subtask index=0. If we let those subtasks finish, subsequent > > > checkpoints will miss this information. > > > Esp. Legacy sources problem: > > > https://issues.apache.org/jira/browse/FLINK-21080 > > > > > > Best, > > > > > > Dawid > > > On 19/07/2021 15:10, Yun Gao wrote: > > > > > > Hi Till, Dawid, > > > > > > Very thanks for the comments and discussion! Glad that it seems we have > > > come to a convergence, and I also agree with that we could not include > > the > > > optimization in the first version. > > > > > > Best > > > Yun > > > > > > ------------------------------------------------------------------ > > > From:Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org> > > > Send Time:2021 Jul. 19 (Mon.) 17:51 > > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till Rohrmann > > > <trohrm...@apache.org> <trohrm...@apache.org> > > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao > > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> > > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > > Finished > > > > > > Small correction. I meant we need to adjust the EndOfInputEvent of > > course. > > > > > > Best, > > > > > > Dawid > > > > > > On 19/07/2021 11:48, Dawid Wysakowicz wrote: > > > > Hey Till, > > > > > > > > Yes, you're right we will have to adjust the current state of > > > > EndOfPartitionEvent and move the moment when we emit it to have what > > > > we're discussing here. We are aware of that. > > > > > > > > As for the MAX_WATERMARK vs finish(). My take is that we should always > > > > emit MAX_WATERMARK before calling finish() on an operator. At the same > > > > time finish() should not leave behind anything in state, as the > > > > intention is that we never restore from the taken savepoint/checkpoint > > > > (savepoint w drain or bounded data consumed). > > > > > > > > Best, > > > > > > > > Dawid > > > > > > > > On 19/07/2021 11:33, Till Rohrmann wrote: > > > >> Hi Yun and Dawid, > > > >> > > > > > > >> Thanks for your comments. I do agree with your comments that finish() > > can > > > >> do more than MAX_WATERMARK. I guess we should then explain how > > > >> MAX_WATERMARK and finish() play together and what kind of > > > >> order guarantees we provide. > > > >> > > > > > > >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it > > would > > > > > > >> work in its current state because we send this event when the Task is > > about > > > >> to shut down if I am not mistaken. What we want to have is to bring > > the > > > > > > >> StreamTasks into a state so that they shut down on the next > > checkpoint. For > > > >> this we need to keep the StreamTask running. In general, I am a fan > of > > > >> making things explicit if possible. I think this helps maintenance > and > > > > > > >> evolvability of code. That's why I think sending an EndOfInputEvent > > which > > > >> is a StreamTask level event and which says that there won't be any > > other > > > >> records coming only control events could make sense. > > > >> > > > >> I would leave the proposed optimization out of the first version. We > > can > > > >> still add it at a later point in time. > > > >> > > > >> Cheers, > > > >> Till > > > >> > > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz > > > <dwysakow...@apache.org> <dwysakow...@apache.org> > > > >> wrote: > > > >> > > > > > > >>> Personally I don't find this optimization important and I'd rather > > leave > > > > > > >>> it out not to complicate the codebase further. I doubt we save much > > there. > > > >>> I don't have a strong opinion though. > > > >>> > > > >>> Best, > > > >>> > > > >>> Dawid > > > >>> On 19/07/2021 10:31, Yun Gao wrote: > > > >>> > > > >>> Hi, > > > >>> > > > >>> Very thanks Dawid for the thoughts! > > > >>> > > > >>> Currently I also do not have different opinions regarding this part. > > > >>> But I have one more issue to confirm: during the previous discussion > > we > > > >>> have discussed that for the final checkpoint case, we might have an > > > >>> optmization > > > > > > >>> that if a task do not have operators using 2-pc, we might skip > > waiting for > > > >>> the > > > > > > >>> final checkpoint (but we could not skip the savepoint). To allow > > users to > > > >>> express > > > >>> the logic, we have proposed to add one more method to StreamOperator > > & > > > >>> CheckpointListener: > > > >>> > > > >>> interface StreamOperator { > > > >>> default boolean requiresFinalCheckpoint() { > > > >>> return true; > > > >>> } > > > >>> } > > > >>> > > > >>> interface CheckpointListener { > > > >>> > > > >>> default boolean requiresFinalCheckpoint() { > > > >>> return true; > > > >>> } > > > >>> } > > > >>> > > > >>> class AbstractUdfStreamOperator { > > > >>> > > > >>> @Override > > > >>> boolean requiresFinalCheckpoint() { > > > >>> return userFunction instanceof CheckpointListener && > > > > > > >>> ((CheckpointListener) > > userFunction).requiresFinalCheckpoint(); > > > >>> } > > > >>> } > > > >>> > > > >>> > > > >>> I think we should still keep the change ? > > > >>> > > > >>> Best, > > > >>> Yun > > > >>> > > > >>> ------------------Original Mail ------------------ > > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > > > <dwysakow...@apache.org> > > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org> > > > >>> *Send Date:*Sun Jul 18 18:44:50 2021 > > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org> > > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun > > > >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > Tasks > > > >>> Finished > > > >>> > > > >>>> I think we're all really close to the same solution. > > > >>>> > > > >>>> > > > >>>> > > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for time > based > > > >>>> > > > > > > >>>> buffering, but it does not solve flushing other operations such as > > e.g. > > > >>>> > > > >>>> count windows or batching requests in Sinks. I'd prefer to treat > the > > > >>>> > > > >>>> finish() as a message for Operator to "flush all records". The > > > >>>> > > > > > > >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility > > imo. I > > > >>>> > > > >>>> don't think operators need to get a signal "stop-processing" if > they > > > >>>> > > > >>>> don't need to flush records. The "WHEN" records are emitted, should > > be > > > >>>> > > > >>>> in control of the StreamTask, by firing timers or by processing a > > next > > > >>>> > > > >>>> record from upstream. > > > >>>> > > > >>>> > > > >>>> > > > > > > >>>> The only difference of my previous proposal compared to Yun's is > > that I > > > >>>> > > > >>>> did not want to send the EndOfUserRecords event in case of stop w/o > > > >>>> > > > >>>> drain. My thinking was that we could directly go from RUNNING to > > > >>>> > > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit > > > >>>> > > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing > > > >>>> > > > > > > >>>> timers and processing events (without calling finish() on > > Operator). In > > > >>>> > > > >>>> my initial suggestion I though we don't care about some events > > > >>>> > > > >>>> potentially being emitted after the savepoint was taken, as they > > would > > > >>>> > > > > > > >>>> anyway belong to the next after FINAL, which would be discarded. I > > think > > > >>>> > > > >>>> though the proposal to suspend records processing and timers is a > > > >>>> > > > > > > >>>> sensible thing to do and would go with the version that Yun put > > into the > > > >>>> > > > >>>> FLIP Wiki. > > > >>>> > > > >>>> > > > >>>> > > > >>>> What do you think Till? > > > >>>> > > > >>>> > > > >>>> > > > >>>> Best, > > > >>>> > > > >>>> > > > >>>> > > > >>>> Dawid > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> On 16/07/2021 10:03, Yun Gao wrote: > > > >>>> > > > >>>>> Hi Till, Piotr > > > >>>>> Very thanks for the comments! > > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > > > >>>>> I also agree with Piotr that currently they are independent > > mechanisms, > > > >>>> and they are basically the same > > > >>>> > > > >>>>> for the event time. > > > >>>>> For more details, first there are some difference among the three > > > >>>> scenarios regarding the finish: > > > >>>> > > > > > > >>>>> For normal finish and stop-with-savepoint --drain, the job would > > not be > > > >>>> expected to be restarted, > > > >>>> > > > >>>>> and for stop-with-savepoint the job would be expected restart > > later. > > > >>>>> Then for finish / stop-with-savepoint --drain, currently Flink > > would > > > >>>> emit MAX_WATERMARK before the > > > >>>> > > > > > > >>>>> EndOfPartition. Besides, as we have discussed before [1], > > endOfInput / > > > >>>> finish() should also only be called > > > >>>> > > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they > > always > > > >>>> occurs at the same time. After the change, > > > >>>> > > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the finish > > / > > > >>>> stop-with-savepoint --drain cases. > > > >>>> > > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > > a > > > >>>>>> WindowOperator close all windows and emit the results upon > calling > > > >>>>>> finish, for example? > > > >>>>> As discussed above for stop-with-savepoint, we would always keep > > the > > > >>>> window as is, and restore them after restart. > > > >>>> > > > >>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps > > it > > > >>>> depends on the Triggers. For > > > >>>> > > > > > > >>>>> event-time triggers / process time triggers, it would be > > reasonable to > > > >>>> flush all the windows since logically > > > >>>> > > > > > > >>>>> the time would always elapse and the window would always get > > triggered > > > >>>> in a logical future. But for triggers > > > >>>> > > > > > > >>>>> like CountTrigger, no matter how much time pass logically, the > > windows > > > >>>> would not trigger, thus we may not > > > >>>> > > > >>>>> flush these windows. If there are requirements we may provide > > > >>>> additional triggers. > > > >>>> > > > > > > >>>>>> It's a bit messy and I'm not sure if this should be strengthened > > out? > > > >>>> Each one of those has a little bit different semantic/meaning, > > > >>>> > > > > > > >>>>>> but at the same time they are very similar. For single input > > operators > > > >>>> `endInput()` and `finish()` are actually the very same thing. > > > >>>> > > > > > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen > > at the > > > >>>> same time, and for single input operators `endInput()` and > > `finish()` > > > >>>> > > > > > > >>>>> are indeed the same thing. During the last discussion we ever > > mentioned > > > >>>> this issue and at then we thought that we might deprecate > > `endInput()` > > > >>>> > > > >>>>> in the future, then we would only have endInput(int input) and > > > >>>> finish(). > > > >>>> > > > >>>>> Best, > > > >>>>> Yun > > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132 > > > >>>>> ------------------------------------------------------------------ > > > >>>>> From:Piotr Nowojski > > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48 > > > >>>>> To:dev > > > >>>>> Cc:Yun Gao > > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > Tasks > > > >>>> Finished > > > >>>> > > > >>>>> Hi Till, > > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > > a> > > > >>>> WindowOperator close all windows and emit the results upon calling > > > >>>> > > > >>>>>> finish, for example? > > > >>>>> 1) currently they are independent but parallel mechanisms. With > > event > > > >>>> time, they are basically the same. > > > >>>> > > > >>>>> 2) it probably should for the sake of processing time windows. > > > >>>>> Here you are touching the bit of the current design that I like > the > > > > > > >>>> least. We basically have now three different ways of conveying very > > similar > > > >>>> things: > > > >>>> > > > >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator > (what > > > >>>> about processing time?) > > > >>>> > > > >>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's > > > >>>> internal state > > > >>>> > > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator > > > >>>>> It's a bit messy and I'm not sure if this should be strengthened > > out? > > > > > > >>>> Each one of those has a little bit different semantic/meaning, but > > at the > > > > > > >>>> same time they are very similar. For single input operators > > `endInput()` > > > >>>> and `finish()` are actually the very same thing. > > > >>>> > > > >>>>> Piotrek > > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a): > > > >>>>> Thanks for updating the FLIP. Based on the new section about > > > >>>>> stop-with-savepoint [--drain] I got two other questions: > > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > >>>>> 2) StreamOperator.finish says to flush all buffered events. Would > a > > > >>>>> WindowOperator close all windows and emit the results upon calling > > > >>>>> finish, for example? > > > >>>>> Cheers, > > > >>>>> Till > > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > > >>>>>> Thanks a lot for your answers and clarifications Yun. > > > >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a > > > >>>> problem. > > > >>>> > > > >>>>>> 3) Great, this will help a lot with understanding the FLIP. > > > >>>>>> Cheers, > > > >>>>>> Till > > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > > >>>>>> wrote: > > > >>>>>>> Hi Till, > > > >>>>>>> Very thanks for the review and comments! > > > >>>>>>> 1) First I think in fact we could be able to do the computation > > > >>>> outside > > > >>>> > > > >>>>>>> of the main thread, > > > >>>>>>> and the current implementation mainly due to the computation is > > in > > > >>>>>>> general fast and we > > > >>>>>>> initially want to have a simplified first version. > > > > > > >>>>>>> The main requirement here is to have a constant view of the > > state of > > > >>>> the > > > >>>> > > > >>>>>>> tasks, otherwise > > > >>>>>>> for example if we have A -> B, if A is running when we check if > > we > > > >>>> need > > > >>>> > > > >>>>>>> to trigger A, we will > > > >>>>>>> mark A as have to trigger, but if A gets to finished when we > > check > > > >>>> B, we > > > >>>> > > > >>>>>>> will also mark B as > > > > > > >>>>>>> have to trigger, then B will receive both rpc trigger and > > checkpoint > > > >>>>>>> barrier, which would break > > > >>>>>>> some assumption on the task side and complicate the > > implementation. > > > > > > >>>>>>> But to cope this issue, we in fact could first have a snapshot > > of the > > > >>>>>>> tasks' state and then do the > > > > > > >>>>>>> computation, both the two step do not need to be in the main > > thread. > > > > > > >>>>>>> 2) For the computation logic, in fact currently we benefit a lot > > from > > > >>>>>>> some shortcuts on all-to-all > > > > > > >>>>>>> edges and job vertex with all tasks running, these shortcuts > > could do > > > >>>>>>> checks on the job vertex level > > > >>>>>>> first and skip some job vertices as a whole. With this > > optimization > > > >>>> we > > > >>>> > > > >>>>>>> have a O(V) algorithm, and the > > > >>>>>>> current running time of the worst case for a job with 320,000 > > tasks > > > >>>> is > > > >>>> > > > >>>>>>> less than 100ms. For > > > >>>>>>> daily graph sizes the time would be further reduced linearly. > > > >>>>>>> If we do the computation based on the last triggered tasks, we > > may > > > >>>> not > > > >>>> > > > >>>>>>> easily encode this information > > > > > > >>>>>>> into the shortcuts on the job vertex level. And since the time > > seems > > > >>>> to > > > >>>> > > > >>>>>>> be short, perhaps it is enough > > > >>>>>>> to do re-computation from the scratch in consideration of the > > > >>>> tradeoff > > > >>>> > > > >>>>>>> between the performance and > > > >>>>>>> the complexity ? > > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after the > > > >>>> finish() > > > >>>> > > > >>>>>>> method and before the last > > > > > > >>>>>>> snapshotState() method so that we could shut down the whole > > topology > > > >>>> with > > > >>>> > > > >>>>>>> a single final checkpoint. > > > >>>>>>> Very sorry for not include enough details for this part and I'll > > > >>>>>>> complement the FLIP with the details on > > > >>>>>>> the process of the final checkpoint / savepoint. > > > >>>>>>> Best, > > > >>>>>>> Yun > > > >>>>>>> > > ------------------------------------------------------------------ > > > >>>>>>> From:Till Rohrmann > > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 > > > >>>>>>> To:dev > > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > > Tasks > > > >>>>>>> Finished > > > >>>>>>> Hi everyone, > > > >>>>>>> I am a bit late to the voting party but let me ask three > > questions: > > > > > > >>>>>>> 1) Why do we execute the trigger plan computation in the main > > thread > > > >>>> if we > > > >>>> > > > > > > >>>>>>> cannot guarantee that all tasks are still running when > > triggering the > > > >>>>>>> checkpoint? Couldn't we do the computation in a different thread > > in > > > >>>> order > > > >>>> > > > >>>>>>> to relieve the main thread a bit. > > > >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator > > seems > > > >>>> to go > > > >>>> > > > >>>>>>> over the whole topology for every calculation. Wouldn't it be > > more > > > >>>>>>> efficient to maintain the set of current tasks to trigger and > > check > > > >>>>>>> whether > > > >>>>>>> anything has changed and if so check the succeeding tasks until > > we > > > >>>> have > > > >>>> > > > >>>>>>> found the current checkpoint trigger frontier? > > > >>>>>>> 3) When are we going to send the endOfInput events to a > > downstream > > > >>>> task? > > > >>>> > > > >>>>>>> If > > > > > > >>>>>>> this happens after we call finish on the upstream operator but > > before > > > >>>>>>> snapshotState then it would be possible to shut down the whole > > > >>>> topology > > > >>>> > > > > > > >>>>>>> with a single final checkpoint. I think this part could benefit > > from > > > >>>> a bit > > > >>>> > > > >>>>>>> more detailed description in the FLIP. > > > >>>>>>> Cheers, > > > >>>>>>> Till > > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > > > >>>>>>> wrote: > > > >>>>>>>> Hi there, > > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing > the > > > >>>> vote > > > >>>> > > > >>>>>>> now. > > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 > votes: > > > >>>>>>>> - Dawid Wysakowicz (binding) > > > >>>>>>>> - Piotr Nowojski(binding) > > > >>>>>>>> - Jiangang Liu (binding) > > > >>>>>>>> - Arvid Heise (binding) > > > >>>>>>>> - Jing Zhang (binding) > > > >>>>>>>> - Leonard Xu (non-binding) > > > >>>>>>>> - Guowei Ma (binding) > > > >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is > > > >>>> accepted. > > > >>>> > > > >>>>>>>> Very thanks everyone! > > > >>>>>>>> Best, > > > >>>>>>>> Yun > > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > > > >>>> > > > >>>> > > > > > > > > > > >
OpenPGP_signature
Description: OpenPGP digital signature