Hi Martijn,

Thanks for your feedback!

Yes, we propose to support speculative execution for SinkFunction.
1. From the perspective of compatibility, SinkFunction is the most original
Sink implementation.There are lots of implementations based on
SinkFunction, not only in Flink official codebase but also in user's
private codebase. It's a more serious issue than Sink V1. Of course we hope
users could migrate the legacy implementation to the new interface. However
migration is always hard.
2. From the perspective of cost, we don't need to do much extra work to
support speculative execution for SinkFunction. All we need to do is check
whether the SinkFunction implementation
inherits SupportsConcurrentExecutionAttempts or not. The other parts of
work are the same with Sink V2.

To summarize, it's cheap to support speculative execution for SinkFunction.
And it may allow more existing scenarios to run with speculative execution.

Thanks,
Biao /'bɪ.aʊ/



On Wed, 11 Jan 2023 at 21:22, Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Biao,
>
> Apologies for the late jumping in. My only question is about SinkFunction,
> does this imply that we want to add support for this to the SinkFunction?
> If so, I would not be in favour of that since we would like to deprecate (I
> actually thought that was already the case) the SinkFunction in favour of
> SinkV2.
>
> Besides that, I have no other comments.
>
> Best regards,
>
> Martijn
>
> On Wed, Jan 4, 2023 at 7:28 AM Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Hi Biao,
> >
> > Thanks for explanation.
> >
> > +1 for the proposal.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang <wangdachui9...@gmail.com> 于2023年1月4日周三 12:11写道:
> >
> > > Hi Biao,
> > >
> > > Thanks for the explanation of how SinkV2  knows the right subtask
> > > attempt. I have no more questions, +1 for the proposal.
> > >
> > > Best,
> > > Lijie
> > >
> > > Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:
> > >
> > > > Thanks for all your feedback!
> > > >
> > > > To @Yuxia,
> > > >
> > > > > What the sink expect to do to isolate data produced by speculative
> > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > attempt.
> > > > > Does it make difference in isolating data produced?
> > > >
> > > >
> > > > Yes there is something different from the task failover scenario. The
> > > > attempt number is more necessary for speculative execution than
> > failover.
> > > > Because there can be only one subtask instance running at the same
> time
> > > in
> > > > the failover scenario.
> > > >
> > > > Let's take FileSystemOutputFormat as an example. For the failover
> > > scenario,
> > > > the temporary directory to store produced data can be something like
> > > > "$root_dir/task-$taskNumber/". At the initialization phase, subtask
> > > deletes
> > > > and re-creates the temporary directory.
> > > >
> > > > However in the speculative execution scenario, it does not work
> because
> > > > there might be several subtasks running at the same time. These
> > subtasks
> > > > might delete, re-create and write the same temporary directory at the
> > > > same time. The correct temporary directory should be like
> > > > "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's
> necessary
> > to
> > > > expose the attempt number to the Sink implementation to do the data
> > > > isolation.
> > > >
> > > >
> > > > To @Lijie,
> > > >
> > > > > I have a question about this: does SinkV2 need to do the same
> thing?
> > > >
> > > >
> > > > Actually, yes.
> > > >
> > > > Should we/users do it in the committer? If yes, how does the commiter
> > > know
> > > > > which one is the right subtask attempt?
> > > >
> > > >
> > > > Yes, we/users should do it in the committer.
> > > >
> > > > In the current design, the Committer of Sink V2 should get the "which
> > one
> > > > is the right subtask attempt" information from the "committable
> data''
> > > > produced by SinkWriter. Let's take the FileSink as example, the
> > > > "committable data" sent to the Committer contains the full path of
> the
> > > > files produced by SinkWriter. Users could also pass the attempt
> number
> > > > through "committable data" from SinkWriter to Committer.
> > > >
> > > > In the "Rejected Alternatives -> Introduce a way to clean leaked data
> > of
> > > > Sink V2" section of the FLIP document, we discussed some of the
> reasons
> > > > that we didn't provide the API like OutputFormat.
> > > >
> > > > To @Jing Zhang
> > > >
> > > > I have a question about this: Speculative execution of Committer will
> > be
> > > > > disabled.
> > > >
> > > > I agree with your point and I saw the similar requirements to disable
> > > > speculative
> > > > > execution for specified operators.
> > > >
> > > > However the requirement is not supported currently. I think there
> > > > should be some
> > > > > place to describe how to support it.
> > > >
> > > >
> > > > In this FLIP design, the speculative execution of Committer of Sink
> V2
> > > will
> > > > be disabled by Flink. It's not an optional operation. Users can not
> > > change
> > > > it.
> > > > And as you said, "disable speculative execution for specified
> > operators"
> > > is
> > > > not supported in the FLIP. Because it's a bit out of scope: "Sink
> > > Supports
> > > > Speculative Execution For Batch Job". I think it's better to start
> > > another
> > > > FLIP to discuss it. "Fine-grained control of enabling speculative
> > > execution
> > > > for operators" can be the title of that FLIP. And we can discuss
> there
> > > how
> > > > to enable or disable speculative execution for specified operators
> > > > including Committer and pre/post-committer of Sink V2.
> > > >
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > > >
> > > >
> > > > On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com>
> wrote:
> > > >
> > > > > Hi Biao,
> > > > >
> > > > > Thanks for driving this FLIP. It's meaningful to support
> speculative
> > > > > execution
> > > > > of sinks is important.
> > > > >
> > > > > I have a question about this: Speculative execution of Committer
> will
> > > be
> > > > > disabled.
> > > > >
> > > > > I agree with your point and I saw the similar requirements to
> disable
> > > > > speculative execution for specified operators.
> > > > >
> > > > > However the requirement is not supported currently. I think there
> > > should
> > > > be
> > > > > some place to describe how to support it.
> > > > >
> > > > > Best,
> > > > > Jing Zhang
> > > > >
> > > > > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道:
> > > > >
> > > > > > Hi Biao,
> > > > > >
> > > > > > Thanks for driving this FLIP.
> > > > > > In this FLIP, it introduces "int getFinishedAttempt(int
> > > subtaskIndex)"
> > > > > for
> > > > > > OutputFormat to know which subtask attempt is the one marked as
> > > > finished
> > > > > by
> > > > > > JM and commit the right data.
> > > > > > I have a question about this: does SinkV2 need to do the same
> > thing?
> > > > > Should
> > > > > > we/users do it in the committer? If yes, how does the commiter
> know
> > > > which
> > > > > > one is the right subtask attempt?
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > > > > >
> > > > > > > HI, Biao.
> > > > > > > Thanks for driving this FLIP.
> > > > > > > After quick look of this FLIP, I have a question about "expose
> > the
> > > > > > attempt
> > > > > > > number which can be used to isolate data produced by
> speculative
> > > > > > executions
> > > > > > > with the same subtask id".
> > > > > > > What the sink expect to do to isolate data produced by
> > speculative
> > > > > > > executions?  IIUC, if the taks failover, it also generate a new
> > > > > attempt.
> > > > > > > Does it make difference in isolating data produced?
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Yuxia
> > > > > > >
> > > > > > > ----- 原始邮件 -----
> > > > > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > > > > 收件人: "dev" <dev@flink.apache.org>
> > > > > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For
> > > Batch
> > > > > Job
> > > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > > I would like to start a discussion about making Sink support
> > > > > speculative
> > > > > > > execution for batch jobs. This proposal is a follow up of
> > > "FLIP-168:
> > > > > > > Speculative Execution For Batch Job"[1]. Speculative execution
> is
> > > > very
> > > > > > > meaningful for batch jobs. And it would be more complete after
> > > > > supporting
> > > > > > > speculative execution of Sink. Please find more details in the
> > FLIP
> > > > > > > document
> > > > > > > [2].
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > > > > [2]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Biao /'bɪ.aʊ/
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to