Thanks for all your feedback!

To @Yuxia,

> What the sink expect to do to isolate data produced by speculative
> executions?  IIUC, if the taks failover, it also generate a new attempt.
> Does it make difference in isolating data produced?


Yes there is something different from the task failover scenario. The
attempt number is more necessary for speculative execution than failover.
Because there can be only one subtask instance running at the same time in
the failover scenario.

Let's take FileSystemOutputFormat as an example. For the failover scenario,
the temporary directory to store produced data can be something like
"$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
and re-creates the temporary directory.

However in the speculative execution scenario, it does not work because
there might be several subtasks running at the same time. These subtasks
might delete, re-create and write the same temporary directory at the
same time. The correct temporary directory should be like
"$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
expose the attempt number to the Sink implementation to do the data
isolation.


To @Lijie,

> I have a question about this: does SinkV2 need to do the same thing?


Actually, yes.

Should we/users do it in the committer? If yes, how does the commiter know
> which one is the right subtask attempt?


Yes, we/users should do it in the committer.

In the current design, the Committer of Sink V2 should get the "which one
is the right subtask attempt" information from the "committable data''
produced by SinkWriter. Let's take the FileSink as example, the
"committable data" sent to the Committer contains the full path of the
files produced by SinkWriter. Users could also pass the attempt number
through "committable data" from SinkWriter to Committer.

In the "Rejected Alternatives -> Introduce a way to clean leaked data of
Sink V2" section of the FLIP document, we discussed some of the reasons
that we didn't provide the API like OutputFormat.

To @Jing Zhang

I have a question about this: Speculative execution of Committer will be
> disabled.

I agree with your point and I saw the similar requirements to disable
speculative
> execution for specified operators.

However the requirement is not supported currently. I think there
should be some
> place to describe how to support it.


In this FLIP design, the speculative execution of Committer of Sink V2 will
be disabled by Flink. It's not an optional operation. Users can not change
it.
And as you said, "disable speculative execution for specified operators" is
not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
Speculative Execution For Batch Job". I think it's better to start another
FLIP to discuss it. "Fine-grained control of enabling speculative execution
for operators" can be the title of that FLIP. And we can discuss there how
to enable or disable speculative execution for specified operators
including Committer and pre/post-committer of Sink V2.

What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Wed, 28 Dec 2022 at 11:30, Jing Zhang <beyond1...@gmail.com> wrote:

> Hi Biao,
>
> Thanks for driving this FLIP. It's meaningful to support speculative
> execution
> of sinks is important.
>
> I have a question about this: Speculative execution of Committer will be
> disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative execution for specified operators.
>
> However the requirement is not supported currently. I think there should be
> some place to describe how to support it.
>
> Best,
> Jing Zhang
>
> Lijie Wang <wangdachui9...@gmail.com> 于2022年12月27日周二 18:51写道:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP.
> > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> for
> > OutputFormat to know which subtask attempt is the one marked as finished
> by
> > JM and commit the right data.
> > I have a question about this: does SinkV2 need to do the same thing?
> Should
> > we/users do it in the committer? If yes, how does the commiter know which
> > one is the right subtask attempt?
> >
> > Best,
> > Lijie
> >
> > yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年12月27日周二 10:01写道:
> >
> > > HI, Biao.
> > > Thanks for driving this FLIP.
> > > After quick look of this FLIP, I have a question about "expose the
> > attempt
> > > number which can be used to isolate data produced by speculative
> > executions
> > > with the same subtask id".
> > > What the sink expect to do to isolate data produced by speculative
> > > executions?  IIUC, if the taks failover, it also generate a new
> attempt.
> > > Does it make difference in isolating data produced?
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > ----- 原始邮件 -----
> > > 发件人: "Biao Liu" <mmyy1...@gmail.com>
> > > 收件人: "dev" <dev@flink.apache.org>
> > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch
> Job
> > >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about making Sink support
> speculative
> > > execution for batch jobs. This proposal is a follow up of "FLIP-168:
> > > Speculative Execution For Batch Job"[1]. Speculative execution is very
> > > meaningful for batch jobs. And it would be more complete after
> supporting
> > > speculative execution of Sink. Please find more details in the FLIP
> > > document
> > > [2].
> > >
> > > Looking forward to your feedback.
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-281+Sink+Supports+Speculative+Execution+For+Batch+Job
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> >
>

Reply via email to