Hi ZhuZhu, Jiangjie,

Thanks a lot for your feedback.

I agree that it's better to support most existing events.
I have updated the FLIP to cover how to deal with the
RequestSplitEvent/SourceEventWrapper/ReaderRegistrationEvent.

The ReportedWatermarkEvent is only used in watermark alignment.
Watermark alignment is a new feature, and still evolving.
Moreover, most users will not use this feature in batch cases.
So I agree not to support it in speculative execution.

Best,
Jing Zhang

Becket Qin <becket....@gmail.com> 于2022年7月5日周二 08:38写道:

> Yes, that sounds reasonable to me. That said, supporting custom events
> might still be preferable if that does not complicate the design too much.
> It would be good to avoid having a tricky feature availability matrix when
> we add new features to the project.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote:
>
>> Hi Jiangjie,
>>
>> Yes you are that the goals of watermark alignment and speculative
>> execution do not conflict. For the example you gave, we can make it
>> work by only aligning watermarks for executions that are pipelined
>> connected (i.e. in the same execution attempt level pipelined region).
>> Even not considering speculative execution, it looks like to be a
>> possible improvement of watermark alignment, for streaming jobs that
>> contains embarrassingly parallel job vertices, so that a slow task
>> does not cause unconnected tasks to be throttled.
>>
>> At the moment, given that it is not needed yet and to avoid further
>> complicating things, I think it's fine to not support watermark
>> alignment in speculative execution cases.
>>
>> WDYT?
>>
>> Thanks,
>> Zhu
>>
>> Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道:
>> >
>> > Hi Zhu,
>> >
>> > I agree that if we are talking about a single execution region with
>> > blocking shuffle, watermark alignment may not be that helpful as the
>> > subtasks are running independently of each other.
>> >
>> > That said, I don't think watermark alignment and speculative execution
>> > necessarily conflict with each other. The idea of watermark alignment
>> is to
>> > make sure the jobs run efficiently, regardless of whether or why the job
>> > has performance issues. On the other hand, the purpose of speculative
>> > execution is to find out whether the jobs have performance issues due to
>> > slow tasks, and fix them.
>> >
>> > For example, a job has one task whose watermark is always lagging
>> behind,
>> > therefore it causes the other tasks to be throttled. The speculative
>> > execution identified the slow task and decided to run it in another
>> node,
>> > thus unblocking the other subtasks.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote:
>> >
>> > > I had another thought and now I think watermark alignment is actually
>> > > conceptually conflicted with speculative execution.
>> > > This is because the idea of watermark alignment is to limit the
>> progress
>> > > of all sources to be around the progress of the slowest source in the
>> > > watermark group. However, speculative execution's goal is to solve the
>> > > slow task problem and it never wants to limit the progress of tasks
>> with
>> > > the progress of the slow task.
>> > > Therefore, I think it's fine to not support watermark alignment.
>> Instead,
>> > > it should throw an error if watermark alignment is enabled in the case
>> > > that speculative execution is enabled.
>> > >
>> > > Thanks,
>> > > Zhu
>> > >
>> > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道:
>> > > >
>> > > > Thanks for updating the FLIP!
>> > > >
>> > > > I agree that at the moment users do not need watermark alignment(in
>> > > > which case ReportedWatermarkEvent would happen) in batch cases.
>> > > > However, I think the concept of watermark alignment is not
>> conflicted
>> > > > with speculative execution. It can work with speculative execution
>> with
>> > > > a little extra effort, by sending the WatermarkAlignmentEvent to all
>> > > > the current executions of each subtask.
>> > > > Therefore, I prefer to support watermark alignment in case it will
>> be
>> > > > needed by batch jobs in the future.
>> > > >
>> > > > Thanks,
>> > > > Zhu
>> > > >
>> > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道:
>> > > > >
>> > > > > Hi all,
>> > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei,
>> Zhuzhu,
>> > > > > I've updated the FLIP-245[1] to including:
>> > > > > 1. Complete the fault-tolerant processing flow.
>> > > > > 2. Support for SourceEvent because it's useful for some
>> user-defined
>> > > > > sources which have a custom event protocol between reader and
>> > > enumerator.
>> > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent
>> > > messages.
>> > > > >
>> > > > > Please review the FLIP-245[1] again, looking forward to your
>> feedback.
>> > > > >
>> > > > > [1]
>> > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>> > > > >
>> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:
>> > > > >
>> > > > > > Hi Guowei,
>> > > > > > Thanks a lot for your feedback.
>> > > > > > Your advices are really helpful.  I've updated the FLIP-245[1]
>> to
>> > > includes
>> > > > > > these parts.
>> > > > > > > First of all, please complete the fault-tolerant processing
>> flow
>> > > in the
>> > > > > > FLIP.
>> > > > > >
>> > > > > > After an execution is created and a source operator becomes
>> ready to
>> > > > > > receive events,  subtaskReady is called,
>> > > SpeculativeSourceCoordinator would
>> > > > > > store the mapping of SubtaskGateway to execution attempt in
>> > > > > > SpeculativeSourceCoordinatorContext.
>> > > > > > Then source operator registers the reader to the coordinator,
>> > > > > > SpeculativeSourceCoordinator would store the mapping of source
>> > > reader to
>> > > > > > execution attempt in SpeculativeSourceCoordinatorContext.
>> > > > > > If the execution goes through a failover, subtaskFailed is
>> called,
>> > > > > > SpeculativeSourceCoordinator would clear information about this
>> > > execution,
>> > > > > > including source readers and SubtaskGateway.
>> > > > > > If all the current executions of the execution vertex are
>> failed,
>> > > > > > subtaskReset would be called, SpeculativeSourceCoordinator would
>> > > clear all
>> > > > > > information about this executions and adding splits back to the
>> split
>> > > > > > enumerator of source.
>> > > > > >
>> > > > > > > Secondly the FLIP only says that user-defined events are not
>> > > supported,
>> > > > > > but it does not explain how to deal with the existing
>> > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent.
>> > > > > >
>> > > > > > For ReaderRegistrationEvent:
>> > > > > > When source operator registers the reader to the coordinator,
>> > > > > > SpeculativeSourceCoordinator would also store the mapping of
>> source
>> > > reader
>> > > > > > to execution attempt in SpeculativeSourceCoordinatorContext.
>> Like
>> > > > > > SourceCoordinator, it also needs to call
>> SplitEnumerator#addReader
>> > > to add a
>> > > > > > new source reader.
>> > > > > > Besides, in order to distinguish source reader between different
>> > > > > > execution, 'ReaderInfo' need to add 'attemptId' field.
>> > > > > >
>> > > > > > For ReportedWatermarkEvent:
>> > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to
>> support
>> > > > > > watermark alignment in streaming mode.
>> > > > > > Speculative execution is only enabled in batch mode. Therefore,
>> > > > > > SpeculativeSourceCoordinator would thrown an exception if
>> receive a
>> > > > > > ReportedWatermarkEvent.
>> > > > > >
>> > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin,
>> I've
>> > > add
>> > > > > > support for SourceEvent because it's useful for some
>> user-defined
>> > > sources
>> > > > > > which have a custom event protocol between reader and
>> enumerator.
>> > > > > >
>> > > > > > Best,
>> > > > > > Jing Zhang
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>> > > > > >
>> > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
>> > > > > >
>> > > > > >> Hi, Jing
>> > > > > >>
>> > > > > >> Thanks a lot for writing this FLIP, which is very useful to
>> Batch
>> > > users.
>> > > > > >> Currently  I have only two small questions:
>> > > > > >>
>> > > > > >> 1. First of all, please complete the fault-tolerant processing
>> flow
>> > > in the
>> > > > > >> FLIP. (Maybe you've already considered it, but it's better to
>> > > explicitly
>> > > > > >> give the specific solution in the FLIP.)
>> > > > > >> For example, how to handle Source `Reader` in case of error.
>> As far
>> > > as I
>> > > > > >> know, once the reader is unavailable, it will result in the
>> > > inability to
>> > > > > >> allocate a new split, which may be unacceptable in the case of
>> > > speculative
>> > > > > >> execution.
>> > > > > >>
>> > > > > >> 2. Secondly the FLIP only says that user-defined events are not
>> > > supported,
>> > > > > >> but it does not explain how to deal with the existing
>> > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in
>> the
>> > > case of
>> > > > > >> speculative execution, there may be two "same" tasks being
>> executed
>> > > at the
>> > > > > >> same time. If these events are repeated, whether they really
>> have no
>> > > > > >> effect
>> > > > > >> on the execution of the job, there is still a clear evaluation.
>> > > > > >>
>> > > > > >> Best,
>> > > > > >> Guowei
>> > > > > >>
>> > > > > >>
>> > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <
>> beyond1...@gmail.com>
>> > > wrote:
>> > > > > >>
>> > > > > >> > Hi all,
>> > > > > >> > One major problem of Flink batch jobs is slow tasks running
>> on
>> > > hot/bad
>> > > > > >> > nodes, resulting in very long execution time.
>> > > > > >> >
>> > > > > >> > In order to solve this problem, FLIP-168: Speculative
>> Execution
>> > > for
>> > > > > >> Batch
>> > > > > >> > Job[1] is introduced and approved recently.
>> > > > > >> >
>> > > > > >> > Here, Zhu Zhu and I propose to support speculative execution
>> of
>> > > sources
>> > > > > >> as
>> > > > > >> > one of follow up of FLIP-168. You could find more details in
>> > > > > >> FLIP-245[2].
>> > > > > >> > Looking forward to your feedback.
>> > > > > >> >
>> > > > > >> > Best,
>> > > > > >> > Jing Zhang
>> > > > > >> >
>> > > > > >> > [1]
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
>> > > > > >> >
>> > > > > >> > [2]
>> > > > > >> >
>> > > > > >> >
>> > > > > >>
>> > >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>> > > > > >> >
>> > > > > >>
>> > > > > >
>> > >
>>
>

Reply via email to