Hi Zhu,

I agree that if we are talking about a single execution region with
blocking shuffle, watermark alignment may not be that helpful as the
subtasks are running independently of each other.

That said, I don't think watermark alignment and speculative execution
necessarily conflict with each other. The idea of watermark alignment is to
make sure the jobs run efficiently, regardless of whether or why the job
has performance issues. On the other hand, the purpose of speculative
execution is to find out whether the jobs have performance issues due to
slow tasks, and fix them.

For example, a job has one task whose watermark is always lagging behind,
therefore it causes the other tasks to be throttled. The speculative
execution identified the slow task and decided to run it in another node,
thus unblocking the other subtasks.

Thanks,

Jiangjie (Becket) Qin



On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote:

> I had another thought and now I think watermark alignment is actually
> conceptually conflicted with speculative execution.
> This is because the idea of watermark alignment is to limit the progress
> of all sources to be around the progress of the slowest source in the
> watermark group. However, speculative execution's goal is to solve the
> slow task problem and it never wants to limit the progress of tasks with
> the progress of the slow task.
> Therefore, I think it's fine to not support watermark alignment. Instead,
> it should throw an error if watermark alignment is enabled in the case
> that speculative execution is enabled.
>
> Thanks,
> Zhu
>
> Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道:
> >
> > Thanks for updating the FLIP!
> >
> > I agree that at the moment users do not need watermark alignment(in
> > which case ReportedWatermarkEvent would happen) in batch cases.
> > However, I think the concept of watermark alignment is not conflicted
> > with speculative execution. It can work with speculative execution with
> > a little extra effort, by sending the WatermarkAlignmentEvent to all
> > the current executions of each subtask.
> > Therefore, I prefer to support watermark alignment in case it will be
> > needed by batch jobs in the future.
> >
> > Thanks,
> > Zhu
> >
> > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道:
> > >
> > > Hi all,
> > > After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu,
> > > I've updated the FLIP-245[1] to including:
> > > 1. Complete the fault-tolerant processing flow.
> > > 2. Support for SourceEvent because it's useful for some user-defined
> > > sources which have a custom event protocol between reader and
> enumerator.
> > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent
> messages.
> > >
> > > Please review the FLIP-245[1] again, looking forward to your feedback.
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > >
> > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:
> > >
> > > > Hi Guowei,
> > > > Thanks a lot for your feedback.
> > > > Your advices are really helpful.  I've updated the FLIP-245[1] to
> includes
> > > > these parts.
> > > > > First of all, please complete the fault-tolerant processing flow
> in the
> > > > FLIP.
> > > >
> > > > After an execution is created and a source operator becomes ready to
> > > > receive events,  subtaskReady is called,
> SpeculativeSourceCoordinator would
> > > > store the mapping of SubtaskGateway to execution attempt in
> > > > SpeculativeSourceCoordinatorContext.
> > > > Then source operator registers the reader to the coordinator,
> > > > SpeculativeSourceCoordinator would store the mapping of source
> reader to
> > > > execution attempt in SpeculativeSourceCoordinatorContext.
> > > > If the execution goes through a failover, subtaskFailed is called,
> > > > SpeculativeSourceCoordinator would clear information about this
> execution,
> > > > including source readers and SubtaskGateway.
> > > > If all the current executions of the execution vertex are failed,
> > > > subtaskReset would be called, SpeculativeSourceCoordinator would
> clear all
> > > > information about this executions and adding splits back to the split
> > > > enumerator of source.
> > > >
> > > > > Secondly the FLIP only says that user-defined events are not
> supported,
> > > > but it does not explain how to deal with the existing
> > > > ReportedWatermarkEvent/ReaderRegistrationEvent.
> > > >
> > > > For ReaderRegistrationEvent:
> > > > When source operator registers the reader to the coordinator,
> > > > SpeculativeSourceCoordinator would also store the mapping of source
> reader
> > > > to execution attempt in SpeculativeSourceCoordinatorContext. Like
> > > > SourceCoordinator, it also needs to call SplitEnumerator#addReader
> to add a
> > > > new source reader.
> > > > Besides, in order to distinguish source reader between different
> > > > execution, 'ReaderInfo' need to add 'attemptId' field.
> > > >
> > > > For ReportedWatermarkEvent:
> > > > ReportedWatermarkEvent is introduced in 1.15 which is used to support
> > > > watermark alignment in streaming mode.
> > > > Speculative execution is only enabled in batch mode. Therefore,
> > > > SpeculativeSourceCoordinator would thrown an exception if receive a
> > > > ReportedWatermarkEvent.
> > > >
> > > > Besides, after offline discussion with Jiangjie (Becket) Qin, I've
> add
> > > > support for SourceEvent because it's useful for some user-defined
> sources
> > > > which have a custom event protocol between reader and enumerator.
> > > >
> > > > Best,
> > > > Jing Zhang
> > > >
> > > > [1]
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > >
> > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
> > > >
> > > >> Hi, Jing
> > > >>
> > > >> Thanks a lot for writing this FLIP, which is very useful to Batch
> users.
> > > >> Currently  I have only two small questions:
> > > >>
> > > >> 1. First of all, please complete the fault-tolerant processing flow
> in the
> > > >> FLIP. (Maybe you've already considered it, but it's better to
> explicitly
> > > >> give the specific solution in the FLIP.)
> > > >> For example, how to handle Source `Reader` in case of error. As far
> as I
> > > >> know, once the reader is unavailable, it will result in the
> inability to
> > > >> allocate a new split, which may be unacceptable in the case of
> speculative
> > > >> execution.
> > > >>
> > > >> 2. Secondly the FLIP only says that user-defined events are not
> supported,
> > > >> but it does not explain how to deal with the existing
> > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the
> case of
> > > >> speculative execution, there may be two "same" tasks being executed
> at the
> > > >> same time. If these events are repeated, whether they really have no
> > > >> effect
> > > >> on the execution of the job, there is still a clear evaluation.
> > > >>
> > > >> Best,
> > > >> Guowei
> > > >>
> > > >>
> > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <beyond1...@gmail.com>
> wrote:
> > > >>
> > > >> > Hi all,
> > > >> > One major problem of Flink batch jobs is slow tasks running on
> hot/bad
> > > >> > nodes, resulting in very long execution time.
> > > >> >
> > > >> > In order to solve this problem, FLIP-168: Speculative Execution
> for
> > > >> Batch
> > > >> > Job[1] is introduced and approved recently.
> > > >> >
> > > >> > Here, Zhu Zhu and I propose to support speculative execution of
> sources
> > > >> as
> > > >> > one of follow up of FLIP-168. You could find more details in
> > > >> FLIP-245[2].
> > > >> > Looking forward to your feedback.
> > > >> >
> > > >> > Best,
> > > >> > Jing Zhang
> > > >> >
> > > >> > [1]
> > > >> >
> > > >> >
> > > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
> > > >> >
> > > >> > [2]
> > > >> >
> > > >> >
> > > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > >> >
> > > >>
> > > >
>

Reply via email to