Hi all, So far, I think we have reached an agreement on this FLIP. I have started the voting thread [1]. Please cast your vote there or ask additional questions here.
[1] https://lists.apache.org/thread/58yyxvnygw7oy5t556g8rm9y2xzb1l66 Best, Lijie Lijie Wang <wangdachui9...@gmail.com> 于2021年11月5日周五 下午4:04写道: > Hi all, > > Thanks for all the comments on this FLIP. > > Based on the discussion in the mailing list and comments in the wiki, I > updated the FLIP doc, the mainly changes include: > > 1. Added the limitation that currently only supports ALL-EDGES-BLOCKING > batch jobs*.* > > 2. Adopted some suggestions in the wiki, including adjustments to the > implementation plan, etc. > > Best, > > Lijie > > Till Rohrmann <trohrm...@apache.org> 于2021年11月3日周三 下午4:58写道: > >> I have to admit that I cannot think of a better name for the adaptive >> batch >> scheduler atm. Maybe it is good enough to call the two schedulers >> AdaptiveBatchScheduler and AdaptiveStreamingScheduler to tell which >> scheduler is used for which execution mode. It is true, though, that the >> former is adaptive wrt the workload and the latter wrt the available >> resources. >> >> Cheers, >> Till >> >> On Wed, Nov 3, 2021 at 9:08 AM Lijie Wang <wangdachui9...@gmail.com> >> wrote: >> >> > Hi David, >> > >> > >> > Thanks for your comments. >> > >> > >> > I personally think that "Adaptive" means: Flink automatically determines >> > the appropriate scheduling and execution plan based on some information. >> > The information can include both resource information and workload >> > information, rather than being limited to a certain dimension. >> > >> > >> > >> > For the adaptive scheduler, it is currently resource-based, but in the >> > future I think it can combine the workload (auto-scaling). >> > >> > >> > For the batch adaptive scheduler, it's currently workload-based, but as >> > Till & Zhu mentioned, our ultimate goal will also consider resources. >> > >> > Short summary, the "Adaptive" can include multiple dimensions. >> Currently, >> > both the adaptive scheduler and batch adaptive scheduler only focus on >> one >> > dimension, and we hope to combine more dimensions in order to give a >> better >> > scheduling and execution plan in the future. >> > >> > >> > At present, in order not to confuse users, it is necessary to indicate >> the >> > information different schedulers based on in the document. >> > >> > >> > Best, >> > >> > Lijie >> > >> > >> > >> > David Morávek <d...@apache.org> 于2021年11月2日周二 下午9:50写道: >> > >> > > Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks >> > > pretty solid and it will be a really great improvement to the batch >> > > scheduling. I'd second to the Till's feedback, especially when it >> comes >> > to >> > > the consistent behavior between different deployment types / >> schedulers. >> > > >> > > What I'm bit unsure about is the naming here. The word *Adaptive* >> means >> > > something different in the streaming and batch scheduler: >> > > - For *streaming* it refers to the ability to adapt the job >> parallelism >> > > based on the resource availability. >> > > - For *batch* it refers to the ability to adapt the stage parallelism >> > based >> > > on the output of the previous stage. >> > > >> > > Should this be a concern? >> > > >> > > Best, >> > > D. >> > > >> > > >> > > >> > > On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com> >> > > wrote: >> > > >> > > > Hi, Till & Zhu >> > > > >> > > > Thanks for your feedback. Also thanks for your comments and >> suggestions >> > > > >> > > > on wiki, which are very helpful for perfecting the FLIP. >> > > > >> > > > >> > > > I also agree to provide our users with consistent and >> > easy-to-understand >> > > > >> > > > deployment options. Regarding the three options proposed by Till, my >> > > > opinion >> > > > >> > > > is the same as Zhu's. In the first version, we can only support >> > > "option1", >> > > > and >> > > > then the >> > > > >> > > > "option2" and "option3" can be the future improvements. >> > > > >> > > > Regarding the side note to abstract subpartitions as splits, >> > > > >> > > > although it is not our original intention, I personally feel it's >> > > > meaningful. >> > > > >> > > > This is also helpful to users, users can use it to do some >> monitoring >> > > work, >> > > > >> > > > in order to get the progress of jobs in detail. >> > > > >> > > > Best, >> > > > >> > > > Lijie >> > > > >> > > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道: >> > > > >> > > > > Hi Till, >> > > > > >> > > > > Thanks for the comments! >> > > > > >> > > > > I agree with you that we should avoid an auto-scaled job not able >> to >> > be >> > > > > scheduled >> > > > > in standalone/reactive mode. And I think it's great if we can >> expose >> > a >> > > > > deployment >> > > > > option that is consistent for streaming and batch jobs, which can >> be >> > > > easier >> > > > > to >> > > > > understand. Just looking to the day to make both adaptive >> schedulers >> > > > > default, so >> > > > > that most users do not need to care about job tuning while the job >> > can >> > > > run >> > > > > well. >> > > > > >> > > > > Regarding the three options, personally I prefer to take *#1* as >> the >> > > > first >> > > > > step, to >> > > > > limit the scope of this FLIP a bit, otherwise it may be too >> > > complicated. >> > > > > I think *#3* is the final goal we need to target later, so that >> mixed >> > > > > bounded and >> > > > > unbounded workloads can be supported. Given that there can be >> > multiple >> > > > > stages scheduled at the same time, the design of the scheduling >> may >> > not >> > > > be >> > > > > very straightforward and needs some thorough consideration. >> > > > > *#2* can be a very good improvement itself. Shuffles of batch jobs >> > can >> > > be >> > > > > auto-determined to be pipelined or blocking according to available >> > > > > resources. >> > > > > But the changes may involve many components and can be large. So I >> > > think >> > > > > it can be a standalone future improvement. >> > > > > >> > > > > Regarding the side note to abstract subpartitions as splits, the >> idea >> > > is >> > > > > very >> > > > > interesting to me. Besides supporting auto scaling, I think >> trackable >> > > > > produced >> > > > > splits can also help in troubleshooting and give some insights for >> > > future >> > > > > improvements. Collecting data sizes for batch adaptive scheduler >> can >> > be >> > > > the >> > > > > first step and we can further consider the abstraction of it. >> > > > > >> > > > > Thanks, >> > > > > Zhu >> > > > > >> > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道: >> > > > > >> > > > > > Hi Lijie, >> > > > > > >> > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-) >> > > > > > >> > > > > > I like the idea of making the parallelism of operators of a >> bounded >> > > job >> > > > > > dependent on the data size. This makes the job adjust >> automatically >> > > > when >> > > > > > the data sources/sizes change. >> > > > > > >> > > > > > I can see this work well in combination with the active mode >> where >> > > > Flink >> > > > > > can ask for more resources. >> > > > > > >> > > > > > In the case of the standalone mode, I think it can lead to >> > situations >> > > > > where >> > > > > > one and the same job can be scheduled or not depending on the >> input >> > > > data. >> > > > > > The problem is pipelined regions that contain more than a single >> > > > operator >> > > > > > instance (e.g. pipelined shuffles). We already have this problem >> > when >> > > > > > submitting a batch job with too high parallelism onto a >> standalone >> > > > > cluster. >> > > > > > However, with the adaptive batch mode this problem might become >> a >> > bit >> > > > > more >> > > > > > present. So my question would be how can we solve this problem >> > > > > (potentially >> > > > > > in a follow up step). I could think of the following three >> > > alternatives >> > > > > > atm: >> > > > > > >> > > > > > 1. Only allow blocking data exchanges: This will limit the size >> of >> > a >> > > > > > pipelined region to a single operator instance. This has the >> > downside >> > > > > that >> > > > > > we no longer support pipelined execution of multiple operators >> > (other >> > > > > than >> > > > > > chained). Moreover, it requires the user to set all data >> exchanges >> > to >> > > > > > blocking which cannot be enforced atm. >> > > > > > 2. Introduce a new pipelined-blocking data exchange hybrid that >> > > > supports >> > > > > > pipelined data exchanges but can also spill to disk if there is >> no >> > > > > > consumer: This could allow to still make progress in case that >> one >> > > has >> > > > a >> > > > > > pipelined region which requires more slots than what we >> currently >> > > have. >> > > > > > 3. Decide on the actual parallelism of a pipelined region after >> > > having >> > > > > > received the slots that are declared based on the data size per >> > > > subtask. >> > > > > If >> > > > > > the pipelined region contains an all-to-all connection, then the >> > > > > > parallelism is how many slots we currently have. If not, then >> the >> > > > > > parallelism can be decided by the data volume: This would >> > effectively >> > > > > mean >> > > > > > to enable the existing AdaptiveScheduler to also run batch >> > workloads. >> > > > > > >> > > > > > With either of these options, I believe that we could provide a >> > > > somewhat >> > > > > > consistent behaviour across the different deployment and >> execution >> > > > modes >> > > > > > wrt to scaling: >> > > > > > >> > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run >> with >> > > > fewer >> > > > > > slots than requested. Can ask for more slots. Once new slots >> arrive >> > > it >> > > > > will >> > > > > > make use of it. >> > > > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run >> > with >> > > > > fewer >> > > > > > slots than requested. Once new slots arrive it will make use of >> it. >> > > > > > c) Active + batch job that uses batch adaptive scheduler + any >> of >> > 1., >> > > > 2. >> > > > > or >> > > > > > 3.: Can run with fewer slots than requested (because it can >> > complete >> > > > the >> > > > > > job with a single slot). Can ask for more slots. Once new slots >> > > arrive >> > > > it >> > > > > > will make use of it. >> > > > > > b) Standalone + batch job that uses batch adaptive scheduler + >> any >> > of >> > > > 1., >> > > > > > 2. or 3.: Can run with fewer slots than requested (because it >> can >> > > > > complete >> > > > > > the job with a single slot). Once new slots arrive it will make >> use >> > > of >> > > > it >> > > > > > (up to the desired maximum parallelism). >> > > > > > >> > > > > > If we decide to go with option 1. or 2., then we will only be >> able >> > to >> > > > run >> > > > > > mixed workloads (mixture of bounded and unbounded sources) in >> > > streaming >> > > > > > mode. This might be ok for the time being. >> > > > > > >> > > > > > This actually leads to my main concern, which is to give our >> users >> > > > > > consistent and somewhat easy to understand deployment options. >> In >> > > order >> > > > > to >> > > > > > achieve this Flink should always be able to make progress unless >> > the >> > > > > > parallelism is explicitly configured (e.g. a very high >> parallelism >> > > in a >> > > > > > pipelined region that cannot be fulfilled). Moreover, Flink >> should >> > be >> > > > > able >> > > > > > to make use of new resources if the job isn't being run at the >> > > maximum >> > > > > > parallelism already. Removing slots so that the minimum number >> of >> > > > > required >> > > > > > slots is still available should also be possible. Maybe one idea >> > > could >> > > > be >> > > > > > to make the adaptive batch scheduler the default for batch jobs >> > > > > eventually. >> > > > > > For streaming jobs, we would ideally always use the >> > AdaptiveScheduler >> > > > > > to give a consistent behaviour. >> > > > > > >> > > > > > As a side note: Creating as many subpartitions as the maximum >> > > > parallelism >> > > > > > is will result in a one-to-one mapping between sub partitions >> and >> > key >> > > > > > groups. If we then also make the non keyed operators work on a >> set >> > of >> > > > sub >> > > > > > partitions that store the operator state, then the sub >> partitions >> > > could >> > > > > be >> > > > > > seen as some logical work unit/split that is assigned to >> operators. >> > > > > Having >> > > > > > such an abstraction could allow us to track which work unit has >> > > > completed >> > > > > > which helps with rescaling of operators and maintaining order >> > > > guarantees, >> > > > > > for example. >> > > > > > >> > > > > > I also left some smaller comments in the wiki. >> > > > > > >> > > > > > Cheers, >> > > > > > Till >> > > > > > >> > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang < >> > wangdachui9...@gmail.com >> > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Hi all, >> > > > > > > >> > > > > > > >> > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to >> Flink: >> > > > > adaptive >> > > > > > > batch job scheduler. The new scheduler can automatically >> decide >> > > > > > > parallelisms of job vertices for batch jobs, according to the >> > size >> > > of >> > > > > > data >> > > > > > > volume each vertex needs to process. >> > > > > > > >> > > > > > > Major benefits of this scheduler includes: >> > > > > > > >> > > > > > > 1. Batch job users can be relieved from parallelism tuning >> > > > > > > 2. Automatically tuned parallelisms can be vertex level and >> > can >> > > > > better >> > > > > > > fit consumed datasets which have a varying volume size >> every >> > day >> > > > > > > >> > > > > > > >> > > > > > > 1. Vertices from SQL batch jobs can be assigned with >> different >> > > > > > > parallelisms which are automatically tuned >> > > > > > > 2. It can be the first step towards enabling >> auto-rebalancing >> > > > > > workloads >> > > > > > > of tasks >> > > > > > > >> > > > > > > You can find more details in the FLIP-187[1]. Looking forward >> to >> > > your >> > > > > > > feedback. >> > > > > > > >> > > > > > > [1] >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler >> > > > > > > >> > > > > > > Best, >> > > > > > > >> > > > > > > Lijie >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >