Hi everyone,

Thanks a lot for all the feedback!
I will open a vote for it since there is no more concern.

Best,
Jing Zhang

Jing Zhang <beyond1...@gmail.com> 于2022年7月5日周二 11:31写道:

> Hi ZhuZhu, Jiangjie,
>
> Thanks a lot for your feedback.
>
> I agree that it's better to support most existing events.
> I have updated the FLIP to cover how to deal with the
> RequestSplitEvent/SourceEventWrapper/ReaderRegistrationEvent.
>
> The ReportedWatermarkEvent is only used in watermark alignment.
> Watermark alignment is a new feature, and still evolving.
> Moreover, most users will not use this feature in batch cases.
> So I agree not to support it in speculative execution.
>
> Best,
> Jing Zhang
>
> Becket Qin <becket....@gmail.com> 于2022年7月5日周二 08:38写道:
>
>> Yes, that sounds reasonable to me. That said, supporting custom events
>> might still be preferable if that does not complicate the design too much.
>> It would be good to avoid having a tricky feature availability matrix when
>> we add new features to the project.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote:
>>
>>> Hi Jiangjie,
>>>
>>> Yes you are that the goals of watermark alignment and speculative
>>> execution do not conflict. For the example you gave, we can make it
>>> work by only aligning watermarks for executions that are pipelined
>>> connected (i.e. in the same execution attempt level pipelined region).
>>> Even not considering speculative execution, it looks like to be a
>>> possible improvement of watermark alignment, for streaming jobs that
>>> contains embarrassingly parallel job vertices, so that a slow task
>>> does not cause unconnected tasks to be throttled.
>>>
>>> At the moment, given that it is not needed yet and to avoid further
>>> complicating things, I think it's fine to not support watermark
>>> alignment in speculative execution cases.
>>>
>>> WDYT?
>>>
>>> Thanks,
>>> Zhu
>>>
>>> Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道:
>>> >
>>> > Hi Zhu,
>>> >
>>> > I agree that if we are talking about a single execution region with
>>> > blocking shuffle, watermark alignment may not be that helpful as the
>>> > subtasks are running independently of each other.
>>> >
>>> > That said, I don't think watermark alignment and speculative execution
>>> > necessarily conflict with each other. The idea of watermark alignment
>>> is to
>>> > make sure the jobs run efficiently, regardless of whether or why the
>>> job
>>> > has performance issues. On the other hand, the purpose of speculative
>>> > execution is to find out whether the jobs have performance issues due
>>> to
>>> > slow tasks, and fix them.
>>> >
>>> > For example, a job has one task whose watermark is always lagging
>>> behind,
>>> > therefore it causes the other tasks to be throttled. The speculative
>>> > execution identified the slow task and decided to run it in another
>>> node,
>>> > thus unblocking the other subtasks.
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> >
>>> >
>>> > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote:
>>> >
>>> > > I had another thought and now I think watermark alignment is actually
>>> > > conceptually conflicted with speculative execution.
>>> > > This is because the idea of watermark alignment is to limit the
>>> progress
>>> > > of all sources to be around the progress of the slowest source in the
>>> > > watermark group. However, speculative execution's goal is to solve
>>> the
>>> > > slow task problem and it never wants to limit the progress of tasks
>>> with
>>> > > the progress of the slow task.
>>> > > Therefore, I think it's fine to not support watermark alignment.
>>> Instead,
>>> > > it should throw an error if watermark alignment is enabled in the
>>> case
>>> > > that speculative execution is enabled.
>>> > >
>>> > > Thanks,
>>> > > Zhu
>>> > >
>>> > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道:
>>> > > >
>>> > > > Thanks for updating the FLIP!
>>> > > >
>>> > > > I agree that at the moment users do not need watermark alignment(in
>>> > > > which case ReportedWatermarkEvent would happen) in batch cases.
>>> > > > However, I think the concept of watermark alignment is not
>>> conflicted
>>> > > > with speculative execution. It can work with speculative execution
>>> with
>>> > > > a little extra effort, by sending the WatermarkAlignmentEvent to
>>> all
>>> > > > the current executions of each subtask.
>>> > > > Therefore, I prefer to support watermark alignment in case it will
>>> be
>>> > > > needed by batch jobs in the future.
>>> > > >
>>> > > > Thanks,
>>> > > > Zhu
>>> > > >
>>> > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道:
>>> > > > >
>>> > > > > Hi all,
>>> > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei,
>>> Zhuzhu,
>>> > > > > I've updated the FLIP-245[1] to including:
>>> > > > > 1. Complete the fault-tolerant processing flow.
>>> > > > > 2. Support for SourceEvent because it's useful for some
>>> user-defined
>>> > > > > sources which have a custom event protocol between reader and
>>> > > enumerator.
>>> > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent
>>> > > messages.
>>> > > > >
>>> > > > > Please review the FLIP-245[1] again, looking forward to your
>>> feedback.
>>> > > > >
>>> > > > > [1]
>>> > > > >
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>>> > > > >
>>> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道:
>>> > > > >
>>> > > > > > Hi Guowei,
>>> > > > > > Thanks a lot for your feedback.
>>> > > > > > Your advices are really helpful.  I've updated the FLIP-245[1]
>>> to
>>> > > includes
>>> > > > > > these parts.
>>> > > > > > > First of all, please complete the fault-tolerant processing
>>> flow
>>> > > in the
>>> > > > > > FLIP.
>>> > > > > >
>>> > > > > > After an execution is created and a source operator becomes
>>> ready to
>>> > > > > > receive events,  subtaskReady is called,
>>> > > SpeculativeSourceCoordinator would
>>> > > > > > store the mapping of SubtaskGateway to execution attempt in
>>> > > > > > SpeculativeSourceCoordinatorContext.
>>> > > > > > Then source operator registers the reader to the coordinator,
>>> > > > > > SpeculativeSourceCoordinator would store the mapping of source
>>> > > reader to
>>> > > > > > execution attempt in SpeculativeSourceCoordinatorContext.
>>> > > > > > If the execution goes through a failover, subtaskFailed is
>>> called,
>>> > > > > > SpeculativeSourceCoordinator would clear information about this
>>> > > execution,
>>> > > > > > including source readers and SubtaskGateway.
>>> > > > > > If all the current executions of the execution vertex are
>>> failed,
>>> > > > > > subtaskReset would be called, SpeculativeSourceCoordinator
>>> would
>>> > > clear all
>>> > > > > > information about this executions and adding splits back to
>>> the split
>>> > > > > > enumerator of source.
>>> > > > > >
>>> > > > > > > Secondly the FLIP only says that user-defined events are not
>>> > > supported,
>>> > > > > > but it does not explain how to deal with the existing
>>> > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent.
>>> > > > > >
>>> > > > > > For ReaderRegistrationEvent:
>>> > > > > > When source operator registers the reader to the coordinator,
>>> > > > > > SpeculativeSourceCoordinator would also store the mapping of
>>> source
>>> > > reader
>>> > > > > > to execution attempt in SpeculativeSourceCoordinatorContext.
>>> Like
>>> > > > > > SourceCoordinator, it also needs to call
>>> SplitEnumerator#addReader
>>> > > to add a
>>> > > > > > new source reader.
>>> > > > > > Besides, in order to distinguish source reader between
>>> different
>>> > > > > > execution, 'ReaderInfo' need to add 'attemptId' field.
>>> > > > > >
>>> > > > > > For ReportedWatermarkEvent:
>>> > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to
>>> support
>>> > > > > > watermark alignment in streaming mode.
>>> > > > > > Speculative execution is only enabled in batch mode. Therefore,
>>> > > > > > SpeculativeSourceCoordinator would thrown an exception if
>>> receive a
>>> > > > > > ReportedWatermarkEvent.
>>> > > > > >
>>> > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin,
>>> I've
>>> > > add
>>> > > > > > support for SourceEvent because it's useful for some
>>> user-defined
>>> > > sources
>>> > > > > > which have a custom event protocol between reader and
>>> enumerator.
>>> > > > > >
>>> > > > > > Best,
>>> > > > > > Jing Zhang
>>> > > > > >
>>> > > > > > [1]
>>> > > > > >
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>>> > > > > >
>>> > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道:
>>> > > > > >
>>> > > > > >> Hi, Jing
>>> > > > > >>
>>> > > > > >> Thanks a lot for writing this FLIP, which is very useful to
>>> Batch
>>> > > users.
>>> > > > > >> Currently  I have only two small questions:
>>> > > > > >>
>>> > > > > >> 1. First of all, please complete the fault-tolerant
>>> processing flow
>>> > > in the
>>> > > > > >> FLIP. (Maybe you've already considered it, but it's better to
>>> > > explicitly
>>> > > > > >> give the specific solution in the FLIP.)
>>> > > > > >> For example, how to handle Source `Reader` in case of error.
>>> As far
>>> > > as I
>>> > > > > >> know, once the reader is unavailable, it will result in the
>>> > > inability to
>>> > > > > >> allocate a new split, which may be unacceptable in the case of
>>> > > speculative
>>> > > > > >> execution.
>>> > > > > >>
>>> > > > > >> 2. Secondly the FLIP only says that user-defined events are
>>> not
>>> > > supported,
>>> > > > > >> but it does not explain how to deal with the existing
>>> > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in
>>> the
>>> > > case of
>>> > > > > >> speculative execution, there may be two "same" tasks being
>>> executed
>>> > > at the
>>> > > > > >> same time. If these events are repeated, whether they really
>>> have no
>>> > > > > >> effect
>>> > > > > >> on the execution of the job, there is still a clear
>>> evaluation.
>>> > > > > >>
>>> > > > > >> Best,
>>> > > > > >> Guowei
>>> > > > > >>
>>> > > > > >>
>>> > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang <
>>> beyond1...@gmail.com>
>>> > > wrote:
>>> > > > > >>
>>> > > > > >> > Hi all,
>>> > > > > >> > One major problem of Flink batch jobs is slow tasks running
>>> on
>>> > > hot/bad
>>> > > > > >> > nodes, resulting in very long execution time.
>>> > > > > >> >
>>> > > > > >> > In order to solve this problem, FLIP-168: Speculative
>>> Execution
>>> > > for
>>> > > > > >> Batch
>>> > > > > >> > Job[1] is introduced and approved recently.
>>> > > > > >> >
>>> > > > > >> > Here, Zhu Zhu and I propose to support speculative
>>> execution of
>>> > > sources
>>> > > > > >> as
>>> > > > > >> > one of follow up of FLIP-168. You could find more details in
>>> > > > > >> FLIP-245[2].
>>> > > > > >> > Looking forward to your feedback.
>>> > > > > >> >
>>> > > > > >> > Best,
>>> > > > > >> > Jing Zhang
>>> > > > > >> >
>>> > > > > >> > [1]
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >>
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
>>> > > > > >> >
>>> > > > > >> > [2]
>>> > > > > >> >
>>> > > > > >> >
>>> > > > > >>
>>> > >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>>> > > > > >> >
>>> > > > > >>
>>> > > > > >
>>> > >
>>>
>>

Reply via email to