How do you feel about proceeding without an additional vote?

I agree, we added quite a bit in there, but none of the added content
changes what has been voted so far. A big part of the added content can
be considered as clarifying the implementation. I'd be inclined to
proceed without a vote. WDYT?

Best,

Dawid


On 22/07/2021 14:55, Yun Gao wrote:
> Hi Till,
>
> Very thanks for the comments and tips! We will update the FLIP with the new 
> details and we
> should need a new vote since the FLIP is updated a large part, I think we 
> would start the
> vote right after we also solve the remaining UnionListState problem and have 
> a double 
> check on that we have no other missing points for the design and FLIP~
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org>
> Send Time:2021 Jul. 22 (Thu.) 17:57
> To:Yun Gao <yungao...@aliyun.com>
> Cc:Piotr Nowojski <pnowoj...@apache.org>; dev <dev@flink.apache.org>; Yun Gao 
> <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com>
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished
>
> Thanks everyone for this discussion. I think this is very helpful.
>
> I do agree with Piotr's proposal to separate state and the lifecycle of a 
> StreamOperator. That way the finished state can be used to recover a 
> StreamOperator or to rescale the topology. I also believe that this will make 
> the implementation a lot easier since we don't have to remember which 
> StreamOperator has actually finished. Moreover, it solves the problem of 
> rescaling.
>
> I also agree with the proposed solution for checkpoints after finish() has 
> been called. I guess the first checkpoint that was triggered after finish() 
> and sends notifyCheckpointComplete should be good enough to close the 
> StreamTask.
>
> Shall we update the FLIP with these details? Do we need another vote for it 
> or shall we continue w/o it?
>
> Cheers,
> Till
> On Thu, Jul 22, 2021 at 10:45 AM Yun Gao <yungao...@aliyun.com> wrote:
>
> Hi Piotr,
>
> Very thanks for the explanation! and very sorry that initially I should 
> wrongly understand the problem Dawid proposed. 
>
>> And what should we do now? We can of course commit all transactions until 
>> checkpoint 43. 
>> But should we keep waiting for `notyifyCheckpointComplete(44)`? 
>> hat if in the meantime another checkpoint is triggered? We could end up 
>> waiting indefinitely. 
>> Our proposal is to shutdown the task immediately after seeing first 
>> `notifyCheckpointComplete(X)`, 
>> where X is any triggered checkpoint AFTER `finish()`. This should be fine, 
>> as:
>> a) ideally there should be no new pending transactions opened after 
>> checkpoint 42
>> b) even if operator/function is opening some transactions for checkpoint 43 
>> and 
>> checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 
>> should be empty
> Now I understand more on this issue: currently operators like the 
> TwoPhaseCommitSinkFunctions would create
> a new transaction when snapshotting state, thus for each checkpoint there 
> would be one corresponding
> transaction, thus after finish() and before notifyCheckpointComplete(), there 
> might be some empty
> transactions. 
>
> I also agree with the proposal: we could finish the task after we received 
> the first checkpoint complete 
> notification, and the checkpoint with the larger checkpoint id should be 
> allowed to be finished. The 
> checkpoint complete notification should be able to commit all the non-empty 
> transactions. A part of 
> the empty transactions would also get committed, but it should cause no harm. 
> The other empty 
> transactions would be aborted in close(), which should also cause no harm. 
>
>> If checkpoint 44 completes afterwards, it will still be valid. Ideally we 
>> would recommend that after 
> seeing `finish()` operators/functions should not be opening any new 
> transactions, but that shouldn't be required.
>
> And from another of view, I think perhaps it might also be acceptable that we 
> have some requirements for 
> the pattern of the operators, like the sink would be able to skip creating 
> new transactions after finish() is called. 
> Perhaps we may treat the process as a protocol between the framework and the 
> operators, and the operators
> might need to follow the protocol. Of course it would be better that the 
> framework could handle all the cases 
> elegantly, and put less implicit limitation to the operators, and at least we 
> should guarantee not cause compatibility
> problem after changing the process. 
>
> Very thanks for the careful checks on the whole process. 
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Piotr Nowojski <pnowoj...@apache.org>
> Send Time:2021 Jul. 22 (Thu.) 15:33
> To:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
> Cc:Till Rohrmann <trohrm...@apache.org>; Yun Gao 
> <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com>
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished
>
> Hi Guowei,
>
>> Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish can 
>> becalled repeatedly.
> Just to clarify. It's not about calling `finish()` and `endInput()` 
> repeatedly, but about (from the perspective of operator's state) 
> 1. seeing `finish()`
> 2. checkpoint X triggered and completed
> 3. failure + recovery from X
> 4. potentially processing more records
> 5. another `finish()`
>
> But from the context of the remaining part of your message Guowei I presume 
> that you have already got that point :)
>
> Yun:
>
>> For this issue perhaps we could explicitly requires the task to wait for a 
>> checkpoint triggered after finish()> method is called for all the operators 
>> ? We could be able to achieve this target by maintaining
>> some state inside the task.
> Isn't this exactly the "WAITING_FOR_FINAL_CP" from the FLIP document? That we 
> always need to wait for a checkpoint triggered after `finish()` to complete, 
> before shutting down a task?
>
> What Dawid was describing is a scenario where:
> 1. task/operator received `finish()`
> 2. checkpoint 42 triggered (not yet completed)
> 3. checkpoint 43 triggered (not yet completed)
> 4. checkpoint 44 triggered (not yet completed)
> 5. notifyCheckpointComplete(43)
>
> And what should we do now? We can of course commit all transactions until 
> checkpoint 43. But should we keep waiting for 
> `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint 
> is triggered? We could end up waiting indefinitely. 
>
> Our proposal is to shutdown the task immediately after seeing first 
> `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER 
> `finish()`. This should be fine, as:
> a) ideally there should be no new pending transactions opened after 
> checkpoint 42
> b) even if operator/function is opening some transactions for checkpoint 43 
> and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 
> 42 should be empty
>
> Hence comment from Dawid
>> We must make sure those checkpoints do not leave lingering external 
>> resources.
> After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
> - commit transactions from checkpoint 42, (and 43 if they were created, 
> depends on the user code)
> - close operator, aborting any pending transactions (for checkpoint 44 if 
> they were opened, depends on the user code)
>
> If checkpoint 44 completes afterwards, it will still be valid. Ideally we 
> would recommend that after seeing `finish()` operators/functions should not 
> be opening any new transactions, but that shouldn't be required.
>
> Best,
> Piotrek
> czw., 22 lip 2021 o 09:00 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):
> Hi Dawid, Piotr, Steven,
>
>  Very thanks for pointing out these issues and very thanks for the discussion 
> !
>
>  Failure before notifyCheckpointComplete()
>
>  For this issue I would agree with what Piotr has proposed. I tried to use 
> some
>  operators like sink / window as example and currently I also do not found
>  explicit scenarios that might cause problems if records are processed by a
>  task that is assigned with states snapshotted after calling finish() before. 
>  For the future cases it seems users should be able to implement their 
>  target logic by explicitly add a flag regarding finished, and perhaps have
>  different logic if this part of states are referred to. Besides, this case 
> would
>  only happen on rescaling or topology change, which embedded some kind of 
> user 
>  knowledge inside the action. Thus it looks acceptable that we still split 
> the operators 
>  state from the task lifecycle, and do not treat checkpoint after finish() 
> differently. 
>
>  Finishing upon receiving notifyCheckpointComplete() of not the latest 
> checkpoint
>
>  For this issue perhaps we could explicitly requires the task to wait for a 
> checkpoint triggered after finish()
>  method is called for all the operators ? We could be able to achieve this 
> target by maintaining 
>  some state inside the task. 
>
>  Checkpointing from a single subtask / UnionListState case
>
>  This should indeed cause problems, and I also agree with that we could focus 
> on this thread in the  
> https://issues.apache.org/jira/browse/FLINK-21080. 
>
>  Best,
>  Yun
>
>
>  ------------------------------------------------------------------
>  From:Piotr Nowojski <pnowoj...@apache.org>
>  Send Time:2021 Jul. 22 (Thu.) 02:46
>  To:dev <dev@flink.apache.org>
>  Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>; Yun 
> Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com>
>  Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished
>
>  Hi Steven,
>
>  > I probably missed sth here. isn't this the case today already? Why is it a 
> concern for the proposed change?
>
>  The problem is with the newly added `finish()` method and the already 
> existing `endInput()` call. Currently on master there are no issues, because 
> we are not checkpointing any operators after some operators have finished. 
> The purpose of this FLIP-147 is to exactly enable this and this opens a new 
> problem described by Dawid.
>
>  To paraphrase and to give a concrete example.  Assume we have an operator 
> with parallelism of two. Subtask 0 and subtask 1.
>
>  1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1 hasn't 
> (yet).
>  2. Checkpoint 42 is triggered, and it completes.
>  3. Job fails and is restarted, but at the same time it's rescaled. User has 
> chosen to scale down his operator down to 1.
>
>  Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has 
> been processed or not, so while recovering to checkpoint 42, we have to 
> recover both finished subtask 0 (#1) state and not yet finished subtask 1's 
> (#1). But at the same time they are scaled down, so we only have a single 
> subtask 0 (#2) that has a combined state from both of the previous instances. 
> The potentially confusing issue is that the state from subtask 0 (#1) was 
> checkpointed AFTER `endInput()` and `finish()` calls, but it's recovered to 
> an operator that has still some records to process. In step 1. an user for 
> example could store on the operator's state a bit of information "end input 
> has been already called!", that after recovery would no longer be true.
>
>  Hence the question about `finish()` and `endInput()` semantics. Should it be 
> tied down to state, or just to an operator instance/execution attempt?
>
>  Piotrek
>  śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a):
>  > if a failure happens after sequence of finish() -> snapshotState(), but
>   before notifyCheckpointComplete(), we will restore such a state and we
>   might end up sending some more records to such an operator.
>
>   I probably missed sth here. isn't this the case today already? Why is it a
>   concern for the proposed change?
>
>   On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org> wrote:
>
>   > Hi Dawid,
>   >
>   > Thanks for writing down those concerns.
>   >
>   > I think the first issue boils down what should be the contract of 
> lifecycle
>   > methods like open(), close(), initializeState() etc and especially the new
>   > additions like finish() and endInput(). And what should be their relation
>   > with the operator state (regardless of it's type keyed, non-keyed, union,
>   > ...). Should those methods be tied to state or not? After thinking about 
> it
>   > for a while (and discussing it offline with Dawid), I think the answer
>   > might be no, they shouldn't. I mean maybe we should just openly say that
>   > all of those methods relate to this single particular instance and
>   > execution of the operator. And if a job is recovered/rescaled, we would be
>   > allowed to freely resume consumption, ignoring a fact that maybe some 
> parts
>   > of the state have previously seen `endInput()`. Why?
>   >
>   > 0. Yes, it might be confusing. Especially with `endInput()`. We call
>   > `endInput()`, we store something in a state and later after recovery
>   > combined with rescaling that state can see more records? Indeed weird,
>   > 1. I haven't come up yet with a counterexample that would break and make
>   > impossible to implement a real life use case. Theoretically yes, the user
>   > can store `endInput()` on state, and after rescaling this state would be
>   > inconsistent with what is actually happening with the operator, but I
>   > haven't found a use case that would break because of that.
>   > 2. Otherwise, implementation would be very difficult.
>   > 3. It's difficult to access keyed state from within 
> `endInput()`/`finish()`
>   > calls, as they do not have key context.
>   > 4. After all, openly defining `endInput()` and `finish()` to be tied with
>   > it's operator execution instance lifecycle is not that strange and quite
>   > simple to explain. Sure, it can lead to a bit of confusion (0.), but that
>   > doesn't sound that bad in comparison with the alternatives that I'm aware
>   > of. Also currently methods like `open()` and `close()` are also tied to 
> the
>   > operator execution instance, not to the state. Operators can be opened and
>   > closed multiple times, it doesn't mean that the state is lost after 
> closing
>   > an operator.
>   >
>   > For the UnionListState problem I have posted my proposal in the ticket 
> [1],
>   > so maybe let's move that particular discussion there?
>   >
>   > Piotrek
>   >
>   > [1] https://issues.apache.org/jira/browse/FLINK-21080
>   >
>   > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org>
>   > napisał(a):
>   >
>   > > Hey all,
>   > >
>   > > To make the issues that were found transparent to the community, I want
>   > to
>   > > post an update:
>   > >
>   > > *1. Committing side-effects*
>   > > We do want to make sure that all side effects are committed before
>   > > bringing tasks down. Side effects are committed when calling
>   > > notifyCheckpointComplete. For the final checkpoint we introduced the
>   > method
>   > > finish(). This notifies the operator that we have consumed all incoming
>   > > records and we are preparing to close the Task. In turn we should flush
>   > any
>   > > pending buffered records and prepare to commit last transactions. The
>   > goal
>   > > is that after a successful sequence of finish() -> snapshotState() ->
>   > > notifyCheckpointComplete(), the remaining state can be considered
>   > > empty/finished and may be discarded.
>   > >
>   > > *Failure before notifyCheckpointComplete()*
>   > >
>   > > The question is what is the contract of the endInput()/finish() methods
>   > > and how do calling these methods affect the operators keyed, non-keyed
>   > > state and external state. Is it allowed to restore state snapshot taken
>   > > after calling endInput()/finish() and process more records? Or do we
>   > assume
>   > > that after a restore from such a state taken after finish() we should 
> not
>   > > call any of the lifecycle methods or at least make sure those methods do
>   > > not emit records/interact with mailbox etc.
>   > >
>   > > Currently it is possible that if a failure happens after sequence of
>   > > finish() -> snapshotState(), but before notifyCheckpointComplete(), we
>   > will
>   > > restore such a state and we might end up sending some more records to
>   > such
>   > > an operator. It is possible if we rescale and this state is merged with 
> a
>   > > state of a subtask that has not called finish() yet. It can also happen
>   > if
>   > > we rescale the upstream operator and the subtask of interest becomes
>   > > connected to a newly added non finished subtask.
>   > >
>   > > *Snapshotting StreamTasks that finish() has been called*
>   > >
>   > >
>   > > We thought about putting a flag into the snapshot of a subtask produced
>   > > after the finish() method. This would make it possible to skip execution
>   > of
>   > > certain lifecycle methods. Unfortunately this creates problems for
>   > > rescaling. How do we deal with a situation that subtask states with both
>   > > the feature flag set and unset end up in a single StreamTask. Additional
>   > > problem is that we merge those states into a single OperatorSubtaskState
>   > on
>   > > CheckpointCoordinator.
>   > >
>   > > *Finishing upon receiving notifyCheckpointComplete() of not the latest
>   > > checkpoint*
>   > >
>   > > We need to wait for a checkpoint to complete, that started after the
>   > > finish() method. However, we support concurrent checkpoints therefore,
>   > > there might be later checkpoints that completed, but the notification 
> has
>   > > not arrived. We must make sure those checkpoints do not leave lingering
>   > > external resources.
>   > >
>   > > *Checkpointing from a single subtask / UnionListState case*
>   > > There are operators that checkpoint from a single subtask only. Usually
>   > > from the subtask index=0. If we let those subtasks finish, subsequent
>   > > checkpoints will miss this information.
>   > > Esp. Legacy sources problem:
>   > > https://issues.apache.org/jira/browse/FLINK-21080
>   > >
>   > > Best,
>   > >
>   > > Dawid
>   > > On 19/07/2021 15:10, Yun Gao wrote:
>   > >
>   > > Hi Till, Dawid,
>   > >
>   > > Very thanks for the comments and discussion! Glad that it seems we have
>   > > come to a convergence, and I also agree with that we could not include
>   > the
>   > > optimization in the first version.
>   > >
>   > > Best
>   > > Yun
>   > >
>   > > ------------------------------------------------------------------
>   > > From:Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>
>   > > Send Time:2021 Jul. 19 (Mon.) 17:51
>   > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till Rohrmann
>   > > <trohrm...@apache.org> <trohrm...@apache.org>
>   > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao
>   > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid>
>   > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>   > > Finished
>   > >
>   > > Small correction. I meant we need to adjust the EndOfInputEvent of
>   > course.
>   > >
>   > > Best,
>   > >
>   > > Dawid
>   > >
>   > > On 19/07/2021 11:48, Dawid Wysakowicz wrote:
>   > > > Hey Till,
>   > > >
>   > > > Yes, you're right we will have to adjust the current state of
>   > > > EndOfPartitionEvent and move the moment when we emit it to have what
>   > > > we're discussing here. We are aware of that.
>   > > >
>   > > > As for the MAX_WATERMARK vs finish(). My take is that we should always
>   > > > emit MAX_WATERMARK before calling finish() on an operator. At the same
>   > > > time finish() should not leave behind anything in state, as the
>   > > > intention is that we never restore from the taken savepoint/checkpoint
>   > > > (savepoint w drain or bounded data consumed).
>   > > >
>   > > > Best,
>   > > >
>   > > > Dawid
>   > > >
>   > > > On 19/07/2021 11:33, Till Rohrmann wrote:
>   > > >> Hi Yun and Dawid,
>   > > >>
>   > >
>   > > >> Thanks for your comments. I do agree with your comments that finish()
>   > can
>   > > >> do more than MAX_WATERMARK. I guess we should then explain how
>   > > >> MAX_WATERMARK and finish() play together and what kind of
>   > > >> order guarantees we provide.
>   > > >>
>   > >
>   > > >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it
>   > would
>   > >
>   > > >> work in its current state because we send this event when the Task is
>   > about
>   > > >> to shut down if I am not mistaken. What we want to have is to bring
>   > the
>   > >
>   > > >> StreamTasks into a state so that they shut down on the next
>   > checkpoint. For
>   > > >> this we need to keep the StreamTask running. In general, I am a fan 
> of
>   > > >> making things explicit if possible. I think this helps maintenance 
> and
>   > >
>   > > >> evolvability of code. That's why I think sending an EndOfInputEvent
>   > which
>   > > >> is a StreamTask level event and which says that there won't be any
>   > other
>   > > >> records coming only control events could make sense.
>   > > >>
>   > > >> I would leave the proposed optimization out of the first version. We
>   > can
>   > > >> still add it at a later point in time.
>   > > >>
>   > > >> Cheers,
>   > > >> Till
>   > > >>
>   > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz
>   > > <dwysakow...@apache.org> <dwysakow...@apache.org>
>   > > >> wrote:
>   > > >>
>   > >
>   > > >>> Personally I don't find this optimization important and I'd rather
>   > leave
>   > >
>   > > >>> it out not to complicate the codebase further. I doubt we save much
>   > there.
>   > > >>> I don't have a strong opinion though.
>   > > >>>
>   > > >>> Best,
>   > > >>>
>   > > >>> Dawid
>   > > >>> On 19/07/2021 10:31, Yun Gao wrote:
>   > > >>>
>   > > >>> Hi,
>   > > >>>
>   > > >>> Very thanks Dawid for the thoughts!
>   > > >>>
>   > > >>> Currently I also do not have different opinions regarding this part.
>   > > >>> But I have one more issue to confirm: during the previous discussion
>   > we
>   > > >>> have discussed that for the final checkpoint case, we might have an
>   > > >>> optmization
>   > >
>   > > >>> that if a task do not have operators using 2-pc, we might skip
>   > waiting for
>   > > >>> the
>   > >
>   > > >>> final checkpoint (but we could not skip the savepoint). To allow
>   > users to
>   > > >>> express
>   > > >>> the logic, we have proposed to add one more method to StreamOperator
>   > &
>   > > >>> CheckpointListener:
>   > > >>>
>   > > >>> interface StreamOperator {
>   > > >>>     default boolean requiresFinalCheckpoint() {
>   > > >>>         return true;
>   > > >>>     }
>   > > >>> }
>   > > >>>
>   > > >>> interface CheckpointListener {
>   > > >>>
>   > > >>>     default boolean requiresFinalCheckpoint() {
>   > > >>>         return true;
>   > > >>>     }
>   > > >>> }
>   > > >>>
>   > > >>> class AbstractUdfStreamOperator {
>   > > >>>
>   > > >>>     @Override
>   > > >>>     boolean requiresFinalCheckpoint() {
>   > > >>>         return userFunction instanceof CheckpointListener &&
>   > >
>   > > >>>             ((CheckpointListener)
>   > userFunction).requiresFinalCheckpoint();
>   > > >>>     }
>   > > >>> }
>   > > >>>
>   > > >>>
>   > > >>> I think we should still keep the change ?
>   > > >>>
>   > > >>> Best,
>   > > >>> Yun
>   > > >>>
>   > > >>> ------------------Original Mail ------------------
>   > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
>   > > <dwysakow...@apache.org>
>   > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org>
>   > > >>> *Send Date:*Sun Jul 18 18:44:50 2021
>   > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org>
>   > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun
>   > > >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
>   > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
>   > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After 
> Tasks
>   > > >>> Finished
>   > > >>>
>   > > >>>> I think we're all really close to the same solution.
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for time 
> based
>   > > >>>>
>   > >
>   > > >>>> buffering, but it does not solve flushing other operations such as
>   > e.g.
>   > > >>>>
>   > > >>>> count windows or batching requests in Sinks. I'd prefer to treat 
> the
>   > > >>>>
>   > > >>>> finish() as a message for Operator to "flush all records". The
>   > > >>>>
>   > >
>   > > >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility
>   > imo. I
>   > > >>>>
>   > > >>>> don't think operators need to get a signal "stop-processing" if 
> they
>   > > >>>>
>   > > >>>> don't need to flush records. The "WHEN" records are emitted, should
>   > be
>   > > >>>>
>   > > >>>> in control of the StreamTask, by firing timers or by processing a
>   > next
>   > > >>>>
>   > > >>>> record from upstream.
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > >
>   > > >>>> The only difference of my previous proposal compared to Yun's is
>   > that I
>   > > >>>>
>   > > >>>> did not want to send the EndOfUserRecords event in case of stop w/o
>   > > >>>>
>   > > >>>> drain. My thinking was that we could directly go from RUNNING to
>   > > >>>>
>   > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit
>   > > >>>>
>   > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing
>   > > >>>>
>   > >
>   > > >>>> timers and processing events (without calling finish() on
>   > Operator). In
>   > > >>>>
>   > > >>>> my initial suggestion I though we don't care about some events
>   > > >>>>
>   > > >>>> potentially being emitted after the savepoint was taken, as they
>   > would
>   > > >>>>
>   > >
>   > > >>>> anyway belong to the next after FINAL, which would be discarded. I
>   > think
>   > > >>>>
>   > > >>>> though the proposal to suspend records processing and timers is a
>   > > >>>>
>   > >
>   > > >>>> sensible thing to do and would go with the version that Yun put
>   > into the
>   > > >>>>
>   > > >>>> FLIP Wiki.
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>> What do you think Till?
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>> Best,
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>> Dawid
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>>
>   > > >>>> On 16/07/2021 10:03, Yun Gao wrote:
>   > > >>>>
>   > > >>>>> Hi Till, Piotr
>   > > >>>>> Very thanks for the comments!
>   > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>   > >
>   > > >>>>> I also agree with Piotr that currently they are independent
>   > mechanisms,
>   > > >>>> and they are basically the same
>   > > >>>>
>   > > >>>>> for the event time.
>   > > >>>>> For more details, first there are some difference among the three
>   > > >>>> scenarios regarding the finish:
>   > > >>>>
>   > >
>   > > >>>>> For normal finish and stop-with-savepoint --drain, the job would
>   > not be
>   > > >>>> expected to be restarted,
>   > > >>>>
>   > > >>>>> and for stop-with-savepoint the job would be expected restart
>   > later.
>   > > >>>>> Then for finish / stop-with-savepoint --drain, currently Flink
>   > would
>   > > >>>> emit MAX_WATERMARK before the
>   > > >>>>
>   > >
>   > > >>>>> EndOfPartition. Besides, as we have discussed before [1],
>   > endOfInput /
>   > > >>>> finish() should also only be called
>   > > >>>>
>   > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they
>   > always
>   > > >>>> occurs at the same time. After the change,
>   > > >>>>
>   > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the finish
>   > /
>   > > >>>> stop-with-savepoint --drain cases.
>   > > >>>>
>   > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would
>   > a
>   > > >>>>>> WindowOperator close all windows and emit the results upon 
> calling
>   > > >>>>>> finish, for example?
>   > > >>>>> As discussed above for stop-with-savepoint, we would always keep
>   > the
>   > > >>>> window as is, and restore them after restart.
>   > > >>>>
>   > > >>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps
>   > it
>   > > >>>> depends on the Triggers. For
>   > > >>>>
>   > >
>   > > >>>>> event-time triggers / process time triggers, it would be
>   > reasonable to
>   > > >>>> flush all the windows since logically
>   > > >>>>
>   > >
>   > > >>>>> the time would always elapse and the window would always get
>   > triggered
>   > > >>>> in a logical future. But for triggers
>   > > >>>>
>   > >
>   > > >>>>> like CountTrigger, no matter how much time pass logically, the
>   > windows
>   > > >>>> would not trigger, thus we may not
>   > > >>>>
>   > > >>>>> flush these windows. If there are requirements we may provide
>   > > >>>> additional triggers.
>   > > >>>>
>   > >
>   > > >>>>>> It's a bit messy and I'm not sure if this should be strengthened
>   > out?
>   > > >>>> Each one of those has a little bit different semantic/meaning,
>   > > >>>>
>   > >
>   > > >>>>>> but at the same time they are very similar. For single input
>   > operators
>   > > >>>> `endInput()` and `finish()` are actually the very same thing.
>   > > >>>>
>   > >
>   > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen
>   > at the
>   > > >>>> same time, and for single input operators `endInput()` and
>   > `finish()`
>   > > >>>>
>   > >
>   > > >>>>> are indeed the same thing. During the last discussion we ever
>   > mentioned
>   > > >>>> this issue and at then we thought that we might deprecate
>   > `endInput()`
>   > > >>>>
>   > > >>>>> in the future, then we would only have endInput(int input) and
>   > > >>>> finish().
>   > > >>>>
>   > > >>>>> Best,
>   > > >>>>> Yun
>   > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132
>   > > >>>>> ------------------------------------------------------------------
>   > > >>>>> From:Piotr Nowojski
>   > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48
>   > > >>>>> To:dev
>   > > >>>>> Cc:Yun Gao
>   > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After 
> Tasks
>   > > >>>> Finished
>   > > >>>>
>   > > >>>>> Hi Till,
>   > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>   > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would
>   > a>
>   > > >>>> WindowOperator close all windows and emit the results upon calling
>   > > >>>>
>   > > >>>>>> finish, for example?
>   > > >>>>> 1) currently they are independent but parallel mechanisms. With
>   > event
>   > > >>>> time, they are basically the same.
>   > > >>>>
>   > > >>>>> 2) it probably should for the sake of processing time windows.
>   > > >>>>> Here you are touching the bit of the current design that I like 
> the
>   > >
>   > > >>>> least. We basically have now three different ways of conveying very
>   > similar
>   > > >>>> things:
>   > > >>>>
>   > > >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator 
> (what
>   > > >>>> about processing time?)
>   > > >>>>
>   > > >>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's
>   > > >>>> internal state
>   > > >>>>
>   > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator
>   > > >>>>> It's a bit messy and I'm not sure if this should be strengthened
>   > out?
>   > >
>   > > >>>> Each one of those has a little bit different semantic/meaning, but
>   > at the
>   > >
>   > > >>>> same time they are very similar. For single input operators
>   > `endInput()`
>   > > >>>> and `finish()` are actually the very same thing.
>   > > >>>>
>   > > >>>>> Piotrek
>   > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a):
>   > > >>>>> Thanks for updating the FLIP. Based on the new section about
>   > > >>>>> stop-with-savepoint [--drain] I got two other questions:
>   > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>   > > >>>>> 2) StreamOperator.finish says to flush all buffered events. Would 
> a
>   > > >>>>> WindowOperator close all windows and emit the results upon calling
>   > > >>>>> finish, for example?
>   > > >>>>> Cheers,
>   > > >>>>> Till
>   > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>   > > >>>>>> Thanks a lot for your answers and clarifications Yun.
>   > > >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a
>   > > >>>> problem.
>   > > >>>>
>   > > >>>>>> 3) Great, this will help a lot with understanding the FLIP.
>   > > >>>>>> Cheers,
>   > > >>>>>> Till
>   > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>   > > >>>>>> wrote:
>   > > >>>>>>> Hi Till,
>   > > >>>>>>> Very thanks for the review and comments!
>   > > >>>>>>> 1) First I think in fact we could be able to do the computation
>   > > >>>> outside
>   > > >>>>
>   > > >>>>>>> of the main thread,
>   > > >>>>>>> and the current implementation mainly due to the computation is
>   > in
>   > > >>>>>>> general fast and we
>   > > >>>>>>> initially want to have a simplified first version.
>   > >
>   > > >>>>>>> The main requirement here is to have a constant view of the
>   > state of
>   > > >>>> the
>   > > >>>>
>   > > >>>>>>> tasks, otherwise
>   > > >>>>>>> for example if we have A -> B, if A is running when we check if
>   > we
>   > > >>>> need
>   > > >>>>
>   > > >>>>>>> to trigger A, we will
>   > > >>>>>>> mark A as have to trigger, but if A gets to finished when we
>   > check
>   > > >>>> B, we
>   > > >>>>
>   > > >>>>>>> will also mark B as
>   > >
>   > > >>>>>>> have to trigger, then B will receive both rpc trigger and
>   > checkpoint
>   > > >>>>>>> barrier, which would break
>   > > >>>>>>> some assumption on the task side and complicate the
>   > implementation.
>   > >
>   > > >>>>>>> But to cope this issue, we in fact could first have a snapshot
>   > of the
>   > > >>>>>>> tasks' state and then do the
>   > >
>   > > >>>>>>> computation, both the two step do not need to be in the main
>   > thread.
>   > >
>   > > >>>>>>> 2) For the computation logic, in fact currently we benefit a lot
>   > from
>   > > >>>>>>> some shortcuts on all-to-all
>   > >
>   > > >>>>>>> edges and job vertex with all tasks running, these shortcuts
>   > could do
>   > > >>>>>>> checks on the job vertex level
>   > > >>>>>>> first and skip some job vertices as a whole. With this
>   > optimization
>   > > >>>> we
>   > > >>>>
>   > > >>>>>>> have a O(V) algorithm, and the
>   > > >>>>>>> current running time of the worst case for a job with 320,000
>   > tasks
>   > > >>>> is
>   > > >>>>
>   > > >>>>>>> less than 100ms. For
>   > > >>>>>>> daily graph sizes the time would be further reduced linearly.
>   > > >>>>>>> If we do the computation based on the last triggered tasks, we
>   > may
>   > > >>>> not
>   > > >>>>
>   > > >>>>>>> easily encode this information
>   > >
>   > > >>>>>>> into the shortcuts on the job vertex level. And since the time
>   > seems
>   > > >>>> to
>   > > >>>>
>   > > >>>>>>> be short, perhaps it is enough
>   > > >>>>>>> to do re-computation from the scratch in consideration of the
>   > > >>>> tradeoff
>   > > >>>>
>   > > >>>>>>> between the performance and
>   > > >>>>>>> the complexity ?
>   > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after the
>   > > >>>> finish()
>   > > >>>>
>   > > >>>>>>> method and before the last
>   > >
>   > > >>>>>>> snapshotState() method so that we could shut down the whole
>   > topology
>   > > >>>> with
>   > > >>>>
>   > > >>>>>>> a single final checkpoint.
>   > > >>>>>>> Very sorry for not include enough details for this part and I'll
>   > > >>>>>>> complement the FLIP with the details on
>   > > >>>>>>> the process of the final checkpoint / savepoint.
>   > > >>>>>>> Best,
>   > > >>>>>>> Yun
>   > > >>>>>>>
>   > ------------------------------------------------------------------
>   > > >>>>>>> From:Till Rohrmann
>   > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05
>   > > >>>>>>> To:dev
>   > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
>   > Tasks
>   > > >>>>>>> Finished
>   > > >>>>>>> Hi everyone,
>   > > >>>>>>> I am a bit late to the voting party but let me ask three
>   > questions:
>   > >
>   > > >>>>>>> 1) Why do we execute the trigger plan computation in the main
>   > thread
>   > > >>>> if we
>   > > >>>>
>   > >
>   > > >>>>>>> cannot guarantee that all tasks are still running when
>   > triggering the
>   > > >>>>>>> checkpoint? Couldn't we do the computation in a different thread
>   > in
>   > > >>>> order
>   > > >>>>
>   > > >>>>>>> to relieve the main thread a bit.
>   > > >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator
>   > seems
>   > > >>>> to go
>   > > >>>>
>   > > >>>>>>> over the whole topology for every calculation. Wouldn't it be
>   > more
>   > > >>>>>>> efficient to maintain the set of current tasks to trigger and
>   > check
>   > > >>>>>>> whether
>   > > >>>>>>> anything has changed and if so check the succeeding tasks until
>   > we
>   > > >>>> have
>   > > >>>>
>   > > >>>>>>> found the current checkpoint trigger frontier?
>   > > >>>>>>> 3) When are we going to send the endOfInput events to a
>   > downstream
>   > > >>>> task?
>   > > >>>>
>   > > >>>>>>> If
>   > >
>   > > >>>>>>> this happens after we call finish on the upstream operator but
>   > before
>   > > >>>>>>> snapshotState then it would be possible to shut down the whole
>   > > >>>> topology
>   > > >>>>
>   > >
>   > > >>>>>>> with a single final checkpoint. I think this part could benefit
>   > from
>   > > >>>> a bit
>   > > >>>>
>   > > >>>>>>> more detailed description in the FLIP.
>   > > >>>>>>> Cheers,
>   > > >>>>>>> Till
>   > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>   > > >>>>>>> wrote:
>   > > >>>>>>>> Hi there,
>   > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing 
> the
>   > > >>>> vote
>   > > >>>>
>   > > >>>>>>> now.
>   > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 
> votes:
>   > > >>>>>>>> - Dawid Wysakowicz (binding)
>   > > >>>>>>>> - Piotr Nowojski(binding)
>   > > >>>>>>>> - Jiangang Liu (binding)
>   > > >>>>>>>> - Arvid Heise (binding)
>   > > >>>>>>>> - Jing Zhang (binding)
>   > > >>>>>>>> - Leonard Xu (non-binding)
>   > > >>>>>>>> - Guowei Ma (binding)
>   > > >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is
>   > > >>>> accepted.
>   > > >>>>
>   > > >>>>>>>> Very thanks everyone!
>   > > >>>>>>>> Best,
>   > > >>>>>>>> Yun
>   > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>   > > >>>>
>   > > >>>>
>   > >
>   > >
>   >
>
>
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to