Yes, that sounds reasonable to me. That said, supporting custom events might still be preferable if that does not complicate the design too much. It would be good to avoid having a tricky feature availability matrix when we add new features to the project.
Thanks, Jiangjie (Becket) Qin On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote: > Hi Jiangjie, > > Yes you are that the goals of watermark alignment and speculative > execution do not conflict. For the example you gave, we can make it > work by only aligning watermarks for executions that are pipelined > connected (i.e. in the same execution attempt level pipelined region). > Even not considering speculative execution, it looks like to be a > possible improvement of watermark alignment, for streaming jobs that > contains embarrassingly parallel job vertices, so that a slow task > does not cause unconnected tasks to be throttled. > > At the moment, given that it is not needed yet and to avoid further > complicating things, I think it's fine to not support watermark > alignment in speculative execution cases. > > WDYT? > > Thanks, > Zhu > > Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道: > > > > Hi Zhu, > > > > I agree that if we are talking about a single execution region with > > blocking shuffle, watermark alignment may not be that helpful as the > > subtasks are running independently of each other. > > > > That said, I don't think watermark alignment and speculative execution > > necessarily conflict with each other. The idea of watermark alignment is > to > > make sure the jobs run efficiently, regardless of whether or why the job > > has performance issues. On the other hand, the purpose of speculative > > execution is to find out whether the jobs have performance issues due to > > slow tasks, and fix them. > > > > For example, a job has one task whose watermark is always lagging behind, > > therefore it causes the other tasks to be throttled. The speculative > > execution identified the slow task and decided to run it in another node, > > thus unblocking the other subtasks. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > I had another thought and now I think watermark alignment is actually > > > conceptually conflicted with speculative execution. > > > This is because the idea of watermark alignment is to limit the > progress > > > of all sources to be around the progress of the slowest source in the > > > watermark group. However, speculative execution's goal is to solve the > > > slow task problem and it never wants to limit the progress of tasks > with > > > the progress of the slow task. > > > Therefore, I think it's fine to not support watermark alignment. > Instead, > > > it should throw an error if watermark alignment is enabled in the case > > > that speculative execution is enabled. > > > > > > Thanks, > > > Zhu > > > > > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道: > > > > > > > > Thanks for updating the FLIP! > > > > > > > > I agree that at the moment users do not need watermark alignment(in > > > > which case ReportedWatermarkEvent would happen) in batch cases. > > > > However, I think the concept of watermark alignment is not conflicted > > > > with speculative execution. It can work with speculative execution > with > > > > a little extra effort, by sending the WatermarkAlignmentEvent to all > > > > the current executions of each subtask. > > > > Therefore, I prefer to support watermark alignment in case it will be > > > > needed by batch jobs in the future. > > > > > > > > Thanks, > > > > Zhu > > > > > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道: > > > > > > > > > > Hi all, > > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei, > Zhuzhu, > > > > > I've updated the FLIP-245[1] to including: > > > > > 1. Complete the fault-tolerant processing flow. > > > > > 2. Support for SourceEvent because it's useful for some > user-defined > > > > > sources which have a custom event protocol between reader and > > > enumerator. > > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent > > > messages. > > > > > > > > > > Please review the FLIP-245[1] again, looking forward to your > feedback. > > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道: > > > > > > > > > > > Hi Guowei, > > > > > > Thanks a lot for your feedback. > > > > > > Your advices are really helpful. I've updated the FLIP-245[1] to > > > includes > > > > > > these parts. > > > > > > > First of all, please complete the fault-tolerant processing > flow > > > in the > > > > > > FLIP. > > > > > > > > > > > > After an execution is created and a source operator becomes > ready to > > > > > > receive events, subtaskReady is called, > > > SpeculativeSourceCoordinator would > > > > > > store the mapping of SubtaskGateway to execution attempt in > > > > > > SpeculativeSourceCoordinatorContext. > > > > > > Then source operator registers the reader to the coordinator, > > > > > > SpeculativeSourceCoordinator would store the mapping of source > > > reader to > > > > > > execution attempt in SpeculativeSourceCoordinatorContext. > > > > > > If the execution goes through a failover, subtaskFailed is > called, > > > > > > SpeculativeSourceCoordinator would clear information about this > > > execution, > > > > > > including source readers and SubtaskGateway. > > > > > > If all the current executions of the execution vertex are failed, > > > > > > subtaskReset would be called, SpeculativeSourceCoordinator would > > > clear all > > > > > > information about this executions and adding splits back to the > split > > > > > > enumerator of source. > > > > > > > > > > > > > Secondly the FLIP only says that user-defined events are not > > > supported, > > > > > > but it does not explain how to deal with the existing > > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent. > > > > > > > > > > > > For ReaderRegistrationEvent: > > > > > > When source operator registers the reader to the coordinator, > > > > > > SpeculativeSourceCoordinator would also store the mapping of > source > > > reader > > > > > > to execution attempt in SpeculativeSourceCoordinatorContext. Like > > > > > > SourceCoordinator, it also needs to call > SplitEnumerator#addReader > > > to add a > > > > > > new source reader. > > > > > > Besides, in order to distinguish source reader between different > > > > > > execution, 'ReaderInfo' need to add 'attemptId' field. > > > > > > > > > > > > For ReportedWatermarkEvent: > > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to > support > > > > > > watermark alignment in streaming mode. > > > > > > Speculative execution is only enabled in batch mode. Therefore, > > > > > > SpeculativeSourceCoordinator would thrown an exception if > receive a > > > > > > ReportedWatermarkEvent. > > > > > > > > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin, > I've > > > add > > > > > > support for SourceEvent because it's useful for some user-defined > > > sources > > > > > > which have a custom event protocol between reader and enumerator. > > > > > > > > > > > > Best, > > > > > > Jing Zhang > > > > > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > > > > > > > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: > > > > > > > > > > > >> Hi, Jing > > > > > >> > > > > > >> Thanks a lot for writing this FLIP, which is very useful to > Batch > > > users. > > > > > >> Currently I have only two small questions: > > > > > >> > > > > > >> 1. First of all, please complete the fault-tolerant processing > flow > > > in the > > > > > >> FLIP. (Maybe you've already considered it, but it's better to > > > explicitly > > > > > >> give the specific solution in the FLIP.) > > > > > >> For example, how to handle Source `Reader` in case of error. As > far > > > as I > > > > > >> know, once the reader is unavailable, it will result in the > > > inability to > > > > > >> allocate a new split, which may be unacceptable in the case of > > > speculative > > > > > >> execution. > > > > > >> > > > > > >> 2. Secondly the FLIP only says that user-defined events are not > > > supported, > > > > > >> but it does not explain how to deal with the existing > > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in > the > > > case of > > > > > >> speculative execution, there may be two "same" tasks being > executed > > > at the > > > > > >> same time. If these events are repeated, whether they really > have no > > > > > >> effect > > > > > >> on the execution of the job, there is still a clear evaluation. > > > > > >> > > > > > >> Best, > > > > > >> Guowei > > > > > >> > > > > > >> > > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang < > beyond1...@gmail.com> > > > wrote: > > > > > >> > > > > > >> > Hi all, > > > > > >> > One major problem of Flink batch jobs is slow tasks running on > > > hot/bad > > > > > >> > nodes, resulting in very long execution time. > > > > > >> > > > > > > >> > In order to solve this problem, FLIP-168: Speculative > Execution > > > for > > > > > >> Batch > > > > > >> > Job[1] is introduced and approved recently. > > > > > >> > > > > > > >> > Here, Zhu Zhu and I propose to support speculative execution > of > > > sources > > > > > >> as > > > > > >> > one of follow up of FLIP-168. You could find more details in > > > > > >> FLIP-245[2]. > > > > > >> > Looking forward to your feedback. > > > > > >> > > > > > > >> > Best, > > > > > >> > Jing Zhang > > > > > >> > > > > > > >> > [1] > > > > > >> > > > > > > >> > > > > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI > > > > > >> > > > > > > >> > [2] > > > > > >> > > > > > > >> > > > > > > >> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job > > > > > >> > > > > > > >> > > > > > > > > > >