Hi Biao,

Thanks for the explanation of how SinkV2  knows the right subtask
attempt. I have no more questions, +1 for the proposal.

Best,
Lijie

Biao Liu <mmyy1...@gmail.com> 于2022年12月28日周三 17:22写道:

> Thanks for all your feedback!
>
> To @Yuxia,
>
> > What the sink expect to do to isolate data produced by speculative
> > executions?  IIUC, if the taks failover, it also generate a new attempt.
> > Does it make difference in isolating data produced?
>
>
> Yes there is something different from the task failover scenario. The
> attempt number is more necessary for speculative execution than failover.
> Because there can be only one subtask instance running at the same time in
> the failover scenario.
>
> Let's take FileSystemOutputFormat as an example. For the failover scenario,
> the temporary directory to store produced data can be something like
> "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
> and re-creates the temporary directory.
>
> However in the speculative execution scenario, it does not work because
> there might be several subtasks running at the same time. These subtasks
> might delete, re-create and write the same temporary directory at the
> same time. The correct temporary directory should be like
> "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
> expose the attempt number to the Sink implementation to do the data
> isolation.
>
>
> To @Lijie,
>
> > I have a question about this: does SinkV2 need to do the same thing?
>
>
> Actually, yes.
>
> Should we/users do it in the committer? If yes, how does the commiter know
> > which one is the right subtask attempt?
>
>
> Yes, we/users should do it in the committer.
>
> In the current design, the Committer of Sink V2 should get the "which one
> is the right subtask attempt" information from the "committable data''
> produced by SinkWriter. Let's take the FileSink as example, the
> "committable data" sent to the Committer contains the full path of the
> files produced by SinkWriter. Users could also pass the attempt number
> through "committable data" from SinkWriter to Committer.
>
> In the "Rejected Alternatives -> Introduce a way to clean leaked data of
> Sink V2" section of the FLIP document, we discussed some of the reasons
> that we didn't provide the API like OutputFormat.
>
> To @Jing Zhang
>
> I have a question about this: Speculative execution of Committer will be
> > disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative
> > execution for specified operators.
>
> However the requirement is not supported currently. I think there
> should be some
> > place to describe how to support it.
>
>
> In this FLIP design, the speculative execution of Committer of Sink V2 will
> be disabled by Flink. It's not an optional operation. Users can not change
> it.
> And as you said, "disable speculative execution for specified operators" is
> not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
> Speculative Execution For Batch Job". I think it's better to start another
> FLIP to discuss it. "Fine-grained control of enabling speculative execution
> for operators" can be the title of that FLIP. And we can discuss there how
> to enable or disable speculative execution for specified operators
> including Committer and pre/post-committer of Sink V2.
>
> What do you think?
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com> wrote:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP. It's meaningful to support speculative
> > execution
> > of sinks is important.
> >
> > I have a question about this: Speculative execution of Committer will be
> > disabled.
> >
> > I agree with your point and I saw the similar requirements to disable
> > speculative execution for specified operators.
> >
> > However the requirement is not supported currently. I think there should
> be
> > some place to describe how to support it.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道:
> >
> > > Hi Biao,
> > >
> > > Thanks for driving this FLIP.
> > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> > for
> > > OutputFormat to know which subtask attempt is the one marked as
> finished
> > by
> > > JM and commit the right data.
> > > I have a question about this: does SinkV2 need to do the same thing?
> > Should
> > > we/users do it in the committer? If yes, how does the commiter know
> which
> > > one is the right subtask attempt?
> > >
> > > Best,
> > > Lijie
> > >
> > > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> > >
> > > > HI, Biao.
> > > > Thanks for driving this FLIP.
> > > > After quick look of this FLIP, I have a question about "expose the
> > > attempt
> > > > number which can be used to isolate data produced by speculative
> > > executions
> > > > with the same subtask id".
> > > > What the sink expect to do to isolate data produced by speculative
> > > > executions?  IIUC, if the taks failover, it also generate a new
> > attempt.
> > > > Does it make difference in isolating data produced?
> > > >
> > > > Best regards,
> > > > Yuxia
> > > >
> > > > ----- 原始邮件 -----
> > > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > > 收件人: "dev" <dev@flink.apache.org>
> > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch
> > Job
> > > >
> > > > Hi everyone,
> > > >
> > > > I would like to start a discussion about making Sink support
> > speculative
> > > > execution for batch jobs. This proposal is a follow up of "FLIP-168:
> > > > Speculative Execution For Batch Job"[1]. Speculative execution is
> very
> > > > meaningful for batch jobs. And it would be more complete after
> > supporting
> > > > speculative execution of Sink. Please find more details in the FLIP
> > > > document
> > > > [2].
> > > >
> > > > Looking forward to your feedback.
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > > Thanks,
> > > > Biao /'bɪ.aʊ/
> > > >
> > >
> >
>

Reply via email to