Hi, Till
Thank you very much for your careful consideration

*1. Emit records in `NotifyCheckpointComplete`.*
    Sorry for making you misunderstanding because of my expression. I just
want to say the current interface does not prevent users from doing it.
>From the perspective of the new sink api, we might not depend on emitting
records in `NotifyCheckpointComplete`, like using `OperatorCoordinator`
instead.


*2. What does the FLIP-147 guarantee?*I think initially this FLIP want to
achieve two targets:
1. Tasks/Operators exit correctly (as you mentioned the lifecycle of a
Task/StreamTask/StreamOperator.).
2. Continue to trigger checkpoint after some tasks for mixed jobs.

I think the first thing is related to the discussion in FLINK-21133. If I
understand correctly, in addition to supporting the tasks / operators to
exit correctly, now we also want to unify the process of the tasks and
operators for savepoint / finish.
I think the second thing is orthogonal to the FLINK-21133 because there are
topologies that have both the bounded and unbounded input.

*3. How to unify the operator exit process of FLIP-147 with
stop-with-savepoint?*
I  am not very sure about how to do it yet. But if I understand the
discussion in the jira correctly it needs to introduce some logic into
`CheckpointCoordinator`, which responses for triggering “the unified
operator exit process”.  Am I correct?

Best,
Guowei


On Tue, Feb 23, 2021 at 5:10 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for the explanation Yun and Guowei. I have to admit that I do not
> fully understand why this is strictly required but I think that we are
> touching two very important aspects which might have far fetching
> consequences for how Flink works:
>
> 1) Do we want to allow that multiple checkpoints are required to
> materialize results?
> 2) Do we want to allow to emit records in notifyCheckpointComplete?
>
> For 1) I am not sure whether this has been discussed within the community
> sufficiently. Requiring multiple checkpoints to materialize a result
> because of multi level committers has the consequence that we increase the
> latency from checkpoint interval to #levels * checkpoint interval.
> Moreover, having to drain the pipeline in multiple steps, would break the
> stop-with-savepoint --drain because which savepoint do you report to the
> user?
>
> For 2) allowing to send records after the final notifyCheckpointComplete
> will effectively mean that we need to shut down a topology in multiple
> steps (in the worst case one operator per checkpoint). This would be a
> strong argument for not allowing this to me. The fact that users can send
> records after the notifyCheckpointComplete is more by accident than by
> design. I think we should make this a very deliberate decision and in doubt
> I would be in favour of a more restrictive model unless there is a very
> good reason why this should be supported.
>
> Taking also the discussion in FLINK-21133 [1] into account, it seems to me
> that we haven't really understood what kind of guarantees we want to give
> to our users and how the final checkpoint should exactly work. I understand
> that this is not included in the first scope of FLIP-147 but I think this
> is so important that we should figure this out asap. Also because the exact
> shut down behaviour will have to be aligned with the lifecycle of a
> Task/StreamTask/StreamOperator. And last but not least because other
> features such as the new sink API start building upon a shut down model
> which has not been fully understood/agreed upon.
>
> [1] https://issues.apache.org/jira/browse/FLINK-21133
>
> Cheers,
> Till
>
> On Tue, Feb 16, 2021 at 9:45 AM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Thanks Yun for the detailed explanation.
> > A simple supplementary explanation about the sink case: Maybe we could
> use
> > `OperatorCoordinator` to avoid sending the element to the downstream
> > operator.
> > But I agree we could not limit the users not to emit records in the
> > `notiyCheckpointComplete`.
> >
> > Best,
> > Guowei
> >
> >
> > On Tue, Feb 16, 2021 at 2:06 PM Yun Gao <yungao...@aliyun.com.invalid>
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to first detail the issue with emitting records in
> > > notifyCheckpointComplete for context. For specific usage,
> > > an example would be for sink, it might want to write some metadata
> after
> > > all the transactions are committed
> > > (like write a marker file _SUCCESS to the output directory). This case
> is
> > > currently supported via the two level
> > > committers of the new sink API: when received endOfInput(), the
> Committer
> > > wait for another checkpoint to
> > > commits all the pending transactions and emit the list of files to the
> > > GlobalCommitter. The GlobalCommitter
> > > would wait for another checkpoint to also write the metadata with 2pc
> > > (Although sometimes 2pc is not needed
> > > for writing metadata, it should be only an optimization and still
> > requires
> > > the Committer do commit before
> > > notifying the global Committer. Also another note is GlobalCommitter is
> > > also added for some other cases
> > > like some sinks want an commiter with dop = 1, like IceBergSink).
> > >
> > > However, a more general issue to me is that currently we do not limit
> > > users to not emit records in
> > > notifyCheckpointComplete in the API level. The sink case could be
> viewed
> > > as a special case, but in addition
> > > to this one, logically users could also implement their own cases that
> > > emits records in notifyCheckpointComplete.
> > >
> > > Best,
> > > Yun
> > >
> > >  ------------------Original Mail ------------------
> > > Sender:Arvid Heise <ar...@apache.org>
> > > Send Date:Fri Feb 12 20:46:04 2021
> > > Recipients:dev <dev@flink.apache.org>
> > > CC:Yun Gao <yungao...@aliyun.com>
> > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> Finished
> > > Hi Piotr,
> > >
> > >
> > >
> > > Thank you for raising your concern. Unfortunately, I do not have a
> better
> > >
> > > idea than doing closing of operators intermittently with checkpoints (=
> > >
> > > multiple last checkpoints).
> > >
> > >
> > >
> > > However, two ideas on how to improve the overall user experience:
> > >
> > > 1. If an operator is not relying on notifyCheckpointComplete, we can
> > close
> > >
> > > it faster (without waiting for a checkpoint). In general, I'd assume
> that
> > >
> > > almost all non-sinks behave that way.
> > >
> > > 2. We may increase the checkpointing frequency for the last
> checkpoints.
> > We
> > >
> > > need to avoid overloading checkpoint storages and task managers, but I
> > >
> > > assume the more operators are closed, the lower the checkpointing
> > interval
> > >
> > > can be.
> > >
> > >
> > >
> > > For 1, I'd propose to add (name TBD):
> > >
> > >
> > >
> > > default boolean StreamOperator#requiresFinalCheckpoint() {
> > >
> > >  return true;
> > >
> > > }
> > >
> > >
> > >
> > > This means all operators are conservatively (=slowly) closed. For most
> > >
> > > operators, we can then define their behavior by overriding in
> > >
> > > AbstractUdfStreamOperator
> > >
> > >
> > >
> > > @Override
> > >
> > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
> > >
> > >  return userFunction instanceof CheckpointListener;
> > >
> > > }
> > >
> > >
> > >
> > > This idea can be further refined in also adding requiresFinalCheckpoint
> > to
> > >
> > > CheckpointListener to exclude all operators with UDFs that implement
> > >
> > > CheckpointListener but do not need it for 2pc.
> > >
> > >
> > >
> > > @Override
> > >
> > > boolean AbstractUdfStreamOperator#requiresFinalCheckpoint() {
> > >
> > >  return userFunction instanceof CheckpointListener &&
> > >
> > >  ((CheckpointListener) userFunction).requiresFinalCheckpoint();
> > >
> > > }
> > >
> > >
> > >
> > > That approach would also work for statebackends/snapshot strategies
> that
> > >
> > > require some 2pc.
> > >
> > >
> > >
> > > If we can contain it to the @PublicEvolving StreamOperator, it would be
> > >
> > > better of course.
> > >
> > >
> > >
> > > Best,
> > >
> > >
> > >
> > > Arvid
> > >
> > >
> > >
> > > On Fri, Feb 12, 2021 at 11:36 AM Piotr Nowojski
> > >
> > > wrote:
> > >
> > >
> > >
> > > > Hey,
> > >
> > > >
> > >
> > > > I would like to raise a concern about implementation of the final
> > >
> > > > checkpoints taking into account operators/functions that are
> > implementing
> > >
> > > > two phase commit (2pc) protocol for exactly-once processing with some
> > >
> > > > external state (kept outside of the Flink). Primarily exactly-once
> > sinks.
> > >
> > > >
> > >
> > > > First of all, as I understand it, this is not planned in the first
> > > version
> > >
> > > > of this FLIP. I'm fine with that, however I would strongly emphasize
> > this
> > >
> > > > in every place we will be mentioning FLIP-147 efforts. This is
> because
> > > me,
> > >
> > > > as a user, upon hearing "Flink supports checkpointing with bounded
> > > inputs"
> > >
> > > > I would expect 2pc to work properly and to commit the external side
> > > effects
> > >
> > > > upon finishing. As it is now, I (as a user) would be surprised with a
> > >
> > > > silent data loss (of not committed trailing data). This is just a
> > remark,
> > >
> > > > that we need to attach this warning to every blog
> > post/documentation/user
> > >
> > > > mailing list response related to "Support Checkpoints After Tasks
> > >
> > > > Finished". Also I would suggest to prioritize the follow up of
> > supporting
> > >
> > > > 2pc.
> > >
> > > >
> > >
> > > > Secondly, I think we are missing how difficult and problematic will
> be
> > > 2pc
> > >
> > > > support with the final checkpoint.
> > >
> > > >
> > >
> > > > For starters, keep in mind that currently 2pc can be implemented by
> > users
> > >
> > > > using both `@Public` APIs as functions and `@PublicEvolving`
> operators
> > in
> > >
> > > > any place in the job graph. It's not limited to only the sinks. For
> > >
> > > > example users could easily implement the `AsynFunction` (for
> > >
> > > > `AsyncWaitOperator`) that is using 2pc based on the
> > `CheckpointListener`
> > >
> > > > interface. I'm not saying it's common, probably just a tiny minority
> of
> > >
> > > > users are doing that (if any at all), but nevertheless that's
> possible
> > > and
> > >
> > > > currently (implicitly?) supported in Flink.
> > >
> > > >
> > >
> > > > Next complication is the support of bounded streams
> (`BoundedOneInput`
> > or
> > >
> > > > `BoundedMultiInput` interfaces) and the closing/shutdown procedure of
> > the
> > >
> > > > operators. Currently it works as follows:
> > >
> > > > 0. Task receives EndOfPartitionEvent (or source finishes)
> > >
> > > > 1. `endOfInput` is called on the first operator in the chain
> > >
> > > > 2. We quiesce the processing timers
> > >
> > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the
> > > first
> > >
> > > > operator, so no new timers will be triggered
> > >
> > > > 3. We wait for the already fired timers to finish executing (spinning
> > >
> > > > mailbox loop)
> > >
> > > > 4. We are closing the first operator
> > >
> > > > 5. We go to the next (second) operator in the chain and repeat the
> > steps
> > > 1.
> > >
> > > > to 5.
> > >
> > > >
> > >
> > > > This is because operators can emit data after processing
> `endOfInput`,
> > > from
> > >
> > > > timers, async mailbox actions and inside the `close` method itself.
> > >
> > > >
> > >
> > > > Now the problem is to support the final checkpoint with 2pc, we need
> > >
> > > > trigger `snapshotState` and `notifyCheckpointComplete` call at the
> very
> > >
> > > > least only after `endOfInput` call on the operator. Probably the best
> > > place
> > >
> > > > would be in between steps 3. and 4. However that means, we would be
> > > forced
> > >
> > > > to wait for steps 1. to 3. to finish, then wait for a next checkpoint
> > to
> > >
> > > > trigger AND complete, before finally closing the head operator, and
> > only
> > >
> > > > then we can start closing the next operator in the chain:
> > >
> > > >
> > >
> > > > 0. Task receives EndOfPartitionEvent (or source finishes)
> > >
> > > > 1. `endOfInput` is called on the first operator in the chain
> > >
> > > > 2. We quiesce the processing timers
> > >
> > > > (`StreamOperatorWrapper#quiesceTimeServiceAndCloseOperator`) for the
> > > first
> > >
> > > > operator, so no new timers will be triggered
> > >
> > > > 3. We wait for the already fired timers to finish executing (spinning
> > >
> > > > mailbox loop)
> > >
> > > > *3b. We wait for one more checkpoint to trigger and for the
> > >
> > > > `notifyCheckpointComplete` RPC.*
> > >
> > > > 4. We are closing the first operator
> > >
> > > > 5. We go to the next (second) operator in the chain and repeat the
> > steps
> > > 1.
> > >
> > > > to 5.
> > >
> > > >
> > >
> > > > That means, we can close one operator per successful checkpoint. To
> > close
> > >
> > > > 10 operators, we would need 10 successful checkpoints.
> > >
> > > >
> > >
> > > > I was thinking about different approaches to this problem, and I
> > couldn't
> > >
> > > > find any viable ones. All I could think of would break the current
> > >
> > > > `@Public` API and/or would be ugly/confusing for the users.
> > >
> > > >
> > >
> > > > For example a relatively simple solution, to introduce a `preClose`
> or
> > >
> > > > `flush` method to the operators, with a contract that after
> > >
> > > > `flush`, operators would be forbidden from emitting more records, so
> > that
> > >
> > > > we can replace step 4. with this `flush` call, and then having a
> single
> > >
> > > > checkpoint to finish 2pc for all of the operators inside the chain,
> > > doesn't
> > >
> > > > work. Sheer fact of adding this `flush` method and changing the
> > contract
> > >
> > > > would break the current API and Yun Gao has pointed out to me, that
> we
> > >
> > > > either already support, or want to support operators that are
> emitting
> > >
> > > > records from within the `notifyCheckpointComplete` call:
> > >
> > > >
> > >
> > > > > Yun Gao:
> > >
> > > > > like with the new sink api there might be writer -> committer ->
> > global
> > >
> > > > committer, the committer would need to wait for the last checkpoint
> to
> > >
> > > > commit
> > >
> > > > > the last piece of data, and after that it also need to emit the
> list
> > of
> > >
> > > > transactions get committed to global committer to do some
> finalization
> > >
> > > > logic.
> > >
> > > >
> > >
> > > > So it wouldn't solve the problem (at least not fully).
> > >
> > > >
> > >
> > > > I don't know if anyone has any better ideas how to solve this
> problem?
> > >
> > > >
> > >
> > > > Piotrek
> > >
> > > >
> > >
> > > > pt., 15 sty 2021 o 14:57 Yun Gao
> > >
> > > > napisał(a):
> > >
> > > >
> > >
> > > > > Hi Aljoscha,
> > >
> > > > >
> > >
> > > > > I think so since we seems to do not have other divergence and new
> > >
> > > > > objections now. I'll open the vote then. Very thanks!
> > >
> > > > >
> > >
> > > > > Best,
> > >
> > > > > Yun
> > >
> > > > >
> > >
> > > > >
> > >
> > > > > ------------------------------------------------------------------
> > >
> > > > > From:Aljoscha Krettek
> > >
> > > > > Send Time:2021 Jan. 15 (Fri.) 21:24
> > >
> > > > > To:dev
> > >
> > > > > Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
> > > Finished
> > >
> > > > >
> > >
> > > > > Thanks for the summary! I think we can now move towards a [VOTE]
> > > thread,
> > >
> > > > > right?
> > >
> > > > >
> > >
> > > > > On 2021/01/15 13:43, Yun Gao wrote:
> > >
> > > > > >1) For the problem that the "new" root task coincidently finished
> > >
> > > > > >before getting triggered successfully, we have listed two options
> in
> > >
> > > > > >the FLIP-147[1], for the first version, now we are not tend to go
> > with
> > >
> > > > > >the first option that JM would re-compute and re-trigger new
> sources
> > >
> > > > > >when it realized some tasks are not triggered successfully. This
> > > option
> > >
> > > > > >would avoid the complexity of adding new PRC and duplicating task
> > >
> > > > > >states, and in average case it would not cause too much overhead.
> > >
> > > > >
> > >
> > > > > You wrote "we are *not* tend to go with the first option", but I
> > think
> > >
> > > > > you meant wo write "we tend to *now* go with the first option",
> > right?
> > >
> > > > > That's also how it is in the FLIP, I just wanted to clarify for the
> > >
> > > > > mailing list.
> > >
> > > > >
> > >
> > > > >
> > >
> > > >
> > >
> > >
> >
>

Reply via email to