Yes, that sounds reasonable to me. That said, supporting custom events
might still be preferable if that does not complicate the design too much.
It would be good to avoid having a tricky feature availability matrix when
we add new features to the project.

Thanks,

Jiangjie (Becket) Qin



On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote:

> Hi Jiangjie,
>
> Yes you are that the goals of watermark alignment and speculative
> execution do not conflict. For the example you gave, we can make it
> work by only aligning watermarks for executions that are pipelined
> connected (i.e. in the same execution attempt level pipelined region).
> Even not considering speculative execution, it looks like to be a
> possible improvement of watermark alignment, for streaming jobs that
> contains embarrassingly parallel job vertices, so that a slow task
> does not cause unconnected tasks to be throttled.
>
> At the moment, given that it is not needed yet and to avoid further
> complicating things, I think it's fine to not support watermark
> alignment in speculative execution cases.
>
> WDYT?
>
> Thanks,
> Zhu
>
> Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道:
> >
> > Hi Zhu,
> >
> > I agree that if we are talking about a single execution region with
> > blocking shuffle, watermark alignment may not be that helpful as the
> > subtasks are running independently of each other.
> >
> > That said, I don't think watermark alignment and speculative execution
> > necessarily conflict with each other. The idea of watermark alignment is
> to
> > make sure the jobs run efficiently, regardless of whether or why the job
> > has performance issues. On the other hand, the purpose of speculative
> > execution is to find out whether the jobs have performance issues due to
> > slow tasks, and fix them.
> >
> > For example, a job has one task whose watermark is always lagging behind,
> > therefore it causes the other tasks to be throttled. The speculative
> > execution identified the slow task and decided to run it in another node,
> > thus unblocking the other subtasks.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > I had another thought and now I think watermark alignment is actually
> > > conceptually conflicted with speculative execution.
> > > This is because the idea of watermark alignment is to limit the
> progress
> > > of all sources to be around the progress of the slowest source in the
> > > watermark group. However, speculative execution's goal is to solve the
> > > slow task problem and it never wants to limit the progress of tasks
> with
> > > the progress of the slow task.
> > > Therefore, I think it's fine to not support watermark alignment.
> Instead,
> > > it should throw an error if watermark alignment is enabled in the case
> > > that speculative execution is enabled.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道:
> > > >
> > > > Thanks for updating the FLIP!
> > > >
> > > > I agree that at the moment users do not need watermark alignment(in
> > > > which case ReportedWatermarkEvent would happen) in batch cases.
> > > > However, I think the concept of watermark alignment is not conflicted
> > > > with speculative execution. It can work with speculative execution
> with
> > > > a little extra effort, by sending the WatermarkAlignmentEvent to all
> > > > the current executions of each subtask.
> > > > Therefore, I prefer to support watermark alignment in case it will be
> > > > needed by batch jobs in the future.
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道:
> > > > >
> > > > > Hi all,
> > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei,
> Zhuzhu,
> > > > > I've updated the FLIP-245[1] to including:
> > > > > 1. Complete the fault-tolerant processing flow.
> > > > > 2. Support for SourceEvent because it's useful for some
> user-defined
> > > > > sources which have a custom event protocol between reader and
> > > enumerator.
> > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent
> > > messages.
> > > > >
> > > > > Please review the FLIP-245[1] again, looking forward to your
> feedback.
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > > >
> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:
> > > > >
> > > > > > Hi Guowei,
> > > > > > Thanks a lot for your feedback.
> > > > > > Your advices are really helpful.  I've updated the FLIP-245[1] to
> > > includes
> > > > > > these parts.
> > > > > > > First of all, please complete the fault-tolerant processing
> flow
> > > in the
> > > > > > FLIP.
> > > > > >
> > > > > > After an execution is created and a source operator becomes
> ready to
> > > > > > receive events,  subtaskReady is called,
> > > SpeculativeSourceCoordinator would
> > > > > > store the mapping of SubtaskGateway to execution attempt in
> > > > > > SpeculativeSourceCoordinatorContext.
> > > > > > Then source operator registers the reader to the coordinator,
> > > > > > SpeculativeSourceCoordinator would store the mapping of source
> > > reader to
> > > > > > execution attempt in SpeculativeSourceCoordinatorContext.
> > > > > > If the execution goes through a failover, subtaskFailed is
> called,
> > > > > > SpeculativeSourceCoordinator would clear information about this
> > > execution,
> > > > > > including source readers and SubtaskGateway.
> > > > > > If all the current executions of the execution vertex are failed,
> > > > > > subtaskReset would be called, SpeculativeSourceCoordinator would
> > > clear all
> > > > > > information about this executions and adding splits back to the
> split
> > > > > > enumerator of source.
> > > > > >
> > > > > > > Secondly the FLIP only says that user-defined events are not
> > > supported,
> > > > > > but it does not explain how to deal with the existing
> > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent.
> > > > > >
> > > > > > For ReaderRegistrationEvent:
> > > > > > When source operator registers the reader to the coordinator,
> > > > > > SpeculativeSourceCoordinator would also store the mapping of
> source
> > > reader
> > > > > > to execution attempt in SpeculativeSourceCoordinatorContext. Like
> > > > > > SourceCoordinator, it also needs to call
> SplitEnumerator#addReader
> > > to add a
> > > > > > new source reader.
> > > > > > Besides, in order to distinguish source reader between different
> > > > > > execution, 'ReaderInfo' need to add 'attemptId' field.
> > > > > >
> > > > > > For ReportedWatermarkEvent:
> > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to
> support
> > > > > > watermark alignment in streaming mode.
> > > > > > Speculative execution is only enabled in batch mode. Therefore,
> > > > > > SpeculativeSourceCoordinator would thrown an exception if
> receive a
> > > > > > ReportedWatermarkEvent.
> > > > > >
> > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin,
> I've
> > > add
> > > > > > support for SourceEvent because it's useful for some user-defined
> > > sources
> > > > > > which have a custom event protocol between reader and enumerator.
> > > > > >
> > > > > > Best,
> > > > > > Jing Zhang
> > > > > >
> > > > > > [1]
> > > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > > > >
> > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
> > > > > >
> > > > > >> Hi, Jing
> > > > > >>
> > > > > >> Thanks a lot for writing this FLIP, which is very useful to
> Batch
> > > users.
> > > > > >> Currently  I have only two small questions:
> > > > > >>
> > > > > >> 1. First of all, please complete the fault-tolerant processing
> flow
> > > in the
> > > > > >> FLIP. (Maybe you've already considered it, but it's better to
> > > explicitly
> > > > > >> give the specific solution in the FLIP.)
> > > > > >> For example, how to handle Source `Reader` in case of error. As
> far
> > > as I
> > > > > >> know, once the reader is unavailable, it will result in the
> > > inability to
> > > > > >> allocate a new split, which may be unacceptable in the case of
> > > speculative
> > > > > >> execution.
> > > > > >>
> > > > > >> 2. Secondly the FLIP only says that user-defined events are not
> > > supported,
> > > > > >> but it does not explain how to deal with the existing
> > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in
> the
> > > case of
> > > > > >> speculative execution, there may be two "same" tasks being
> executed
> > > at the
> > > > > >> same time. If these events are repeated, whether they really
> have no
> > > > > >> effect
> > > > > >> on the execution of the job, there is still a clear evaluation.
> > > > > >>
> > > > > >> Best,
> > > > > >> Guowei
> > > > > >>
> > > > > >>
> > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <
> beyond1...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> > Hi all,
> > > > > >> > One major problem of Flink batch jobs is slow tasks running on
> > > hot/bad
> > > > > >> > nodes, resulting in very long execution time.
> > > > > >> >
> > > > > >> > In order to solve this problem, FLIP-168: Speculative
> Execution
> > > for
> > > > > >> Batch
> > > > > >> > Job[1] is introduced and approved recently.
> > > > > >> >
> > > > > >> > Here, Zhu Zhu and I propose to support speculative execution
> of
> > > sources
> > > > > >> as
> > > > > >> > one of follow up of FLIP-168. You could find more details in
> > > > > >> FLIP-245[2].
> > > > > >> > Looking forward to your feedback.
> > > > > >> >
> > > > > >> > Best,
> > > > > >> > Jing Zhang
> > > > > >> >
> > > > > >> > [1]
> > > > > >> >
> > > > > >> >
> > > > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
> > > > > >> >
> > > > > >> > [2]
> > > > > >> >
> > > > > >> >
> > > > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> > > > > >> >
> > > > > >>
> > > > > >
> > >
>

Reply via email to