Hi ZhuZhu, Jiangjie, Thanks a lot for your feedback.
I agree that it's better to support most existing events. I have updated the FLIP to cover how to deal with the RequestSplitEvent/SourceEventWrapper/ReaderRegistrationEvent. The ReportedWatermarkEvent is only used in watermark alignment. Watermark alignment is a new feature, and still evolving. Moreover, most users will not use this feature in batch cases. So I agree not to support it in speculative execution. Best, Jing Zhang Becket Qin <becket....@gmail.com> 于2022年7月5日周二 08:38写道: > Yes, that sounds reasonable to me. That said, supporting custom events > might still be preferable if that does not complicate the design too much. > It would be good to avoid having a tricky feature availability matrix when > we add new features to the project. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Jul 4, 2022 at 5:09 PM Zhu Zhu <reed...@gmail.com> wrote: > >> Hi Jiangjie, >> >> Yes you are that the goals of watermark alignment and speculative >> execution do not conflict. For the example you gave, we can make it >> work by only aligning watermarks for executions that are pipelined >> connected (i.e. in the same execution attempt level pipelined region). >> Even not considering speculative execution, it looks like to be a >> possible improvement of watermark alignment, for streaming jobs that >> contains embarrassingly parallel job vertices, so that a slow task >> does not cause unconnected tasks to be throttled. >> >> At the moment, given that it is not needed yet and to avoid further >> complicating things, I think it's fine to not support watermark >> alignment in speculative execution cases. >> >> WDYT? >> >> Thanks, >> Zhu >> >> Becket Qin <becket....@gmail.com> 于2022年7月4日周一 16:15写道: >> > >> > Hi Zhu, >> > >> > I agree that if we are talking about a single execution region with >> > blocking shuffle, watermark alignment may not be that helpful as the >> > subtasks are running independently of each other. >> > >> > That said, I don't think watermark alignment and speculative execution >> > necessarily conflict with each other. The idea of watermark alignment >> is to >> > make sure the jobs run efficiently, regardless of whether or why the job >> > has performance issues. On the other hand, the purpose of speculative >> > execution is to find out whether the jobs have performance issues due to >> > slow tasks, and fix them. >> > >> > For example, a job has one task whose watermark is always lagging >> behind, >> > therefore it causes the other tasks to be throttled. The speculative >> > execution identified the slow task and decided to run it in another >> node, >> > thus unblocking the other subtasks. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > >> > On Mon, Jul 4, 2022 at 3:31 PM Zhu Zhu <reed...@gmail.com> wrote: >> > >> > > I had another thought and now I think watermark alignment is actually >> > > conceptually conflicted with speculative execution. >> > > This is because the idea of watermark alignment is to limit the >> progress >> > > of all sources to be around the progress of the slowest source in the >> > > watermark group. However, speculative execution's goal is to solve the >> > > slow task problem and it never wants to limit the progress of tasks >> with >> > > the progress of the slow task. >> > > Therefore, I think it's fine to not support watermark alignment. >> Instead, >> > > it should throw an error if watermark alignment is enabled in the case >> > > that speculative execution is enabled. >> > > >> > > Thanks, >> > > Zhu >> > > >> > > Zhu Zhu <reed...@gmail.com> 于2022年7月4日周一 14:34写道: >> > > > >> > > > Thanks for updating the FLIP! >> > > > >> > > > I agree that at the moment users do not need watermark alignment(in >> > > > which case ReportedWatermarkEvent would happen) in batch cases. >> > > > However, I think the concept of watermark alignment is not >> conflicted >> > > > with speculative execution. It can work with speculative execution >> with >> > > > a little extra effort, by sending the WatermarkAlignmentEvent to all >> > > > the current executions of each subtask. >> > > > Therefore, I prefer to support watermark alignment in case it will >> be >> > > > needed by batch jobs in the future. >> > > > >> > > > Thanks, >> > > > Zhu >> > > > >> > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:09写道: >> > > > > >> > > > > Hi all, >> > > > > After an offline discussion with Jiangjie (Becket) Qin, Guowei, >> Zhuzhu, >> > > > > I've updated the FLIP-245[1] to including: >> > > > > 1. Complete the fault-tolerant processing flow. >> > > > > 2. Support for SourceEvent because it's useful for some >> user-defined >> > > > > sources which have a custom event protocol between reader and >> > > enumerator. >> > > > > 3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent >> > > messages. >> > > > > >> > > > > Please review the FLIP-245[1] again, looking forward to your >> feedback. >> > > > > >> > > > > [1] >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >> > > > > >> > > > > Jing Zhang <beyond1...@gmail.com> 于2022年7月1日周五 18:02写道: >> > > > > >> > > > > > Hi Guowei, >> > > > > > Thanks a lot for your feedback. >> > > > > > Your advices are really helpful. I've updated the FLIP-245[1] >> to >> > > includes >> > > > > > these parts. >> > > > > > > First of all, please complete the fault-tolerant processing >> flow >> > > in the >> > > > > > FLIP. >> > > > > > >> > > > > > After an execution is created and a source operator becomes >> ready to >> > > > > > receive events, subtaskReady is called, >> > > SpeculativeSourceCoordinator would >> > > > > > store the mapping of SubtaskGateway to execution attempt in >> > > > > > SpeculativeSourceCoordinatorContext. >> > > > > > Then source operator registers the reader to the coordinator, >> > > > > > SpeculativeSourceCoordinator would store the mapping of source >> > > reader to >> > > > > > execution attempt in SpeculativeSourceCoordinatorContext. >> > > > > > If the execution goes through a failover, subtaskFailed is >> called, >> > > > > > SpeculativeSourceCoordinator would clear information about this >> > > execution, >> > > > > > including source readers and SubtaskGateway. >> > > > > > If all the current executions of the execution vertex are >> failed, >> > > > > > subtaskReset would be called, SpeculativeSourceCoordinator would >> > > clear all >> > > > > > information about this executions and adding splits back to the >> split >> > > > > > enumerator of source. >> > > > > > >> > > > > > > Secondly the FLIP only says that user-defined events are not >> > > supported, >> > > > > > but it does not explain how to deal with the existing >> > > > > > ReportedWatermarkEvent/ReaderRegistrationEvent. >> > > > > > >> > > > > > For ReaderRegistrationEvent: >> > > > > > When source operator registers the reader to the coordinator, >> > > > > > SpeculativeSourceCoordinator would also store the mapping of >> source >> > > reader >> > > > > > to execution attempt in SpeculativeSourceCoordinatorContext. >> Like >> > > > > > SourceCoordinator, it also needs to call >> SplitEnumerator#addReader >> > > to add a >> > > > > > new source reader. >> > > > > > Besides, in order to distinguish source reader between different >> > > > > > execution, 'ReaderInfo' need to add 'attemptId' field. >> > > > > > >> > > > > > For ReportedWatermarkEvent: >> > > > > > ReportedWatermarkEvent is introduced in 1.15 which is used to >> support >> > > > > > watermark alignment in streaming mode. >> > > > > > Speculative execution is only enabled in batch mode. Therefore, >> > > > > > SpeculativeSourceCoordinator would thrown an exception if >> receive a >> > > > > > ReportedWatermarkEvent. >> > > > > > >> > > > > > Besides, after offline discussion with Jiangjie (Becket) Qin, >> I've >> > > add >> > > > > > support for SourceEvent because it's useful for some >> user-defined >> > > sources >> > > > > > which have a custom event protocol between reader and >> enumerator. >> > > > > > >> > > > > > Best, >> > > > > > Jing Zhang >> > > > > > >> > > > > > [1] >> > > > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >> > > > > > >> > > > > > Guowei Ma <guowei....@gmail.com> 于2022年6月29日周三 18:06写道: >> > > > > > >> > > > > >> Hi, Jing >> > > > > >> >> > > > > >> Thanks a lot for writing this FLIP, which is very useful to >> Batch >> > > users. >> > > > > >> Currently I have only two small questions: >> > > > > >> >> > > > > >> 1. First of all, please complete the fault-tolerant processing >> flow >> > > in the >> > > > > >> FLIP. (Maybe you've already considered it, but it's better to >> > > explicitly >> > > > > >> give the specific solution in the FLIP.) >> > > > > >> For example, how to handle Source `Reader` in case of error. >> As far >> > > as I >> > > > > >> know, once the reader is unavailable, it will result in the >> > > inability to >> > > > > >> allocate a new split, which may be unacceptable in the case of >> > > speculative >> > > > > >> execution. >> > > > > >> >> > > > > >> 2. Secondly the FLIP only says that user-defined events are not >> > > supported, >> > > > > >> but it does not explain how to deal with the existing >> > > > > >> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in >> the >> > > case of >> > > > > >> speculative execution, there may be two "same" tasks being >> executed >> > > at the >> > > > > >> same time. If these events are repeated, whether they really >> have no >> > > > > >> effect >> > > > > >> on the execution of the job, there is still a clear evaluation. >> > > > > >> >> > > > > >> Best, >> > > > > >> Guowei >> > > > > >> >> > > > > >> >> > > > > >> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang < >> beyond1...@gmail.com> >> > > wrote: >> > > > > >> >> > > > > >> > Hi all, >> > > > > >> > One major problem of Flink batch jobs is slow tasks running >> on >> > > hot/bad >> > > > > >> > nodes, resulting in very long execution time. >> > > > > >> > >> > > > > >> > In order to solve this problem, FLIP-168: Speculative >> Execution >> > > for >> > > > > >> Batch >> > > > > >> > Job[1] is introduced and approved recently. >> > > > > >> > >> > > > > >> > Here, Zhu Zhu and I propose to support speculative execution >> of >> > > sources >> > > > > >> as >> > > > > >> > one of follow up of FLIP-168. You could find more details in >> > > > > >> FLIP-245[2]. >> > > > > >> > Looking forward to your feedback. >> > > > > >> > >> > > > > >> > Best, >> > > > > >> > Jing Zhang >> > > > > >> > >> > > > > >> > [1] >> > > > > >> > >> > > > > >> > >> > > > > >> >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI >> > > > > >> > >> > > > > >> > [2] >> > > > > >> > >> > > > > >> > >> > > > > >> >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job >> > > > > >> > >> > > > > >> >> > > > > > >> > > >> >