Hi all, Thanks for all the comments on this FLIP.
Based on the discussion in the mailing list and comments in the wiki, I updated the FLIP doc, the mainly changes include: 1. Added the limitation that currently only supports ALL-EDGES-BLOCKING batch jobs*.* 2. Adopted some suggestions in the wiki, including adjustments to the implementation plan, etc. Best, Lijie Till Rohrmann <trohrm...@apache.org> 于2021年11月3日周三 下午4:58写道: > I have to admit that I cannot think of a better name for the adaptive batch > scheduler atm. Maybe it is good enough to call the two schedulers > AdaptiveBatchScheduler and AdaptiveStreamingScheduler to tell which > scheduler is used for which execution mode. It is true, though, that the > former is adaptive wrt the workload and the latter wrt the available > resources. > > Cheers, > Till > > On Wed, Nov 3, 2021 at 9:08 AM Lijie Wang <wangdachui9...@gmail.com> > wrote: > > > Hi David, > > > > > > Thanks for your comments. > > > > > > I personally think that "Adaptive" means: Flink automatically determines > > the appropriate scheduling and execution plan based on some information. > > The information can include both resource information and workload > > information, rather than being limited to a certain dimension. > > > > > > > > For the adaptive scheduler, it is currently resource-based, but in the > > future I think it can combine the workload (auto-scaling). > > > > > > For the batch adaptive scheduler, it's currently workload-based, but as > > Till & Zhu mentioned, our ultimate goal will also consider resources. > > > > Short summary, the "Adaptive" can include multiple dimensions. Currently, > > both the adaptive scheduler and batch adaptive scheduler only focus on > one > > dimension, and we hope to combine more dimensions in order to give a > better > > scheduling and execution plan in the future. > > > > > > At present, in order not to confuse users, it is necessary to indicate > the > > information different schedulers based on in the document. > > > > > > Best, > > > > Lijie > > > > > > > > David Morávek <d...@apache.org> 于2021年11月2日周二 下午9:50写道: > > > > > Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks > > > pretty solid and it will be a really great improvement to the batch > > > scheduling. I'd second to the Till's feedback, especially when it comes > > to > > > the consistent behavior between different deployment types / > schedulers. > > > > > > What I'm bit unsure about is the naming here. The word *Adaptive* means > > > something different in the streaming and batch scheduler: > > > - For *streaming* it refers to the ability to adapt the job parallelism > > > based on the resource availability. > > > - For *batch* it refers to the ability to adapt the stage parallelism > > based > > > on the output of the previous stage. > > > > > > Should this be a concern? > > > > > > Best, > > > D. > > > > > > > > > > > > On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com> > > > wrote: > > > > > > > Hi, Till & Zhu > > > > > > > > Thanks for your feedback. Also thanks for your comments and > suggestions > > > > > > > > on wiki, which are very helpful for perfecting the FLIP. > > > > > > > > > > > > I also agree to provide our users with consistent and > > easy-to-understand > > > > > > > > deployment options. Regarding the three options proposed by Till, my > > > > opinion > > > > > > > > is the same as Zhu's. In the first version, we can only support > > > "option1", > > > > and > > > > then the > > > > > > > > "option2" and "option3" can be the future improvements. > > > > > > > > Regarding the side note to abstract subpartitions as splits, > > > > > > > > although it is not our original intention, I personally feel it's > > > > meaningful. > > > > > > > > This is also helpful to users, users can use it to do some monitoring > > > work, > > > > > > > > in order to get the progress of jobs in detail. > > > > > > > > Best, > > > > > > > > Lijie > > > > > > > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道: > > > > > > > > > Hi Till, > > > > > > > > > > Thanks for the comments! > > > > > > > > > > I agree with you that we should avoid an auto-scaled job not able > to > > be > > > > > scheduled > > > > > in standalone/reactive mode. And I think it's great if we can > expose > > a > > > > > deployment > > > > > option that is consistent for streaming and batch jobs, which can > be > > > > easier > > > > > to > > > > > understand. Just looking to the day to make both adaptive > schedulers > > > > > default, so > > > > > that most users do not need to care about job tuning while the job > > can > > > > run > > > > > well. > > > > > > > > > > Regarding the three options, personally I prefer to take *#1* as > the > > > > first > > > > > step, to > > > > > limit the scope of this FLIP a bit, otherwise it may be too > > > complicated. > > > > > I think *#3* is the final goal we need to target later, so that > mixed > > > > > bounded and > > > > > unbounded workloads can be supported. Given that there can be > > multiple > > > > > stages scheduled at the same time, the design of the scheduling may > > not > > > > be > > > > > very straightforward and needs some thorough consideration. > > > > > *#2* can be a very good improvement itself. Shuffles of batch jobs > > can > > > be > > > > > auto-determined to be pipelined or blocking according to available > > > > > resources. > > > > > But the changes may involve many components and can be large. So I > > > think > > > > > it can be a standalone future improvement. > > > > > > > > > > Regarding the side note to abstract subpartitions as splits, the > idea > > > is > > > > > very > > > > > interesting to me. Besides supporting auto scaling, I think > trackable > > > > > produced > > > > > splits can also help in troubleshooting and give some insights for > > > future > > > > > improvements. Collecting data sizes for batch adaptive scheduler > can > > be > > > > the > > > > > first step and we can further consider the abstraction of it. > > > > > > > > > > Thanks, > > > > > Zhu > > > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道: > > > > > > > > > > > Hi Lijie, > > > > > > > > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-) > > > > > > > > > > > > I like the idea of making the parallelism of operators of a > bounded > > > job > > > > > > dependent on the data size. This makes the job adjust > automatically > > > > when > > > > > > the data sources/sizes change. > > > > > > > > > > > > I can see this work well in combination with the active mode > where > > > > Flink > > > > > > can ask for more resources. > > > > > > > > > > > > In the case of the standalone mode, I think it can lead to > > situations > > > > > where > > > > > > one and the same job can be scheduled or not depending on the > input > > > > data. > > > > > > The problem is pipelined regions that contain more than a single > > > > operator > > > > > > instance (e.g. pipelined shuffles). We already have this problem > > when > > > > > > submitting a batch job with too high parallelism onto a > standalone > > > > > cluster. > > > > > > However, with the adaptive batch mode this problem might become a > > bit > > > > > more > > > > > > present. So my question would be how can we solve this problem > > > > > (potentially > > > > > > in a follow up step). I could think of the following three > > > alternatives > > > > > > atm: > > > > > > > > > > > > 1. Only allow blocking data exchanges: This will limit the size > of > > a > > > > > > pipelined region to a single operator instance. This has the > > downside > > > > > that > > > > > > we no longer support pipelined execution of multiple operators > > (other > > > > > than > > > > > > chained). Moreover, it requires the user to set all data > exchanges > > to > > > > > > blocking which cannot be enforced atm. > > > > > > 2. Introduce a new pipelined-blocking data exchange hybrid that > > > > supports > > > > > > pipelined data exchanges but can also spill to disk if there is > no > > > > > > consumer: This could allow to still make progress in case that > one > > > has > > > > a > > > > > > pipelined region which requires more slots than what we currently > > > have. > > > > > > 3. Decide on the actual parallelism of a pipelined region after > > > having > > > > > > received the slots that are declared based on the data size per > > > > subtask. > > > > > If > > > > > > the pipelined region contains an all-to-all connection, then the > > > > > > parallelism is how many slots we currently have. If not, then the > > > > > > parallelism can be decided by the data volume: This would > > effectively > > > > > mean > > > > > > to enable the existing AdaptiveScheduler to also run batch > > workloads. > > > > > > > > > > > > With either of these options, I believe that we could provide a > > > > somewhat > > > > > > consistent behaviour across the different deployment and > execution > > > > modes > > > > > > wrt to scaling: > > > > > > > > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run > with > > > > fewer > > > > > > slots than requested. Can ask for more slots. Once new slots > arrive > > > it > > > > > will > > > > > > make use of it. > > > > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run > > with > > > > > fewer > > > > > > slots than requested. Once new slots arrive it will make use of > it. > > > > > > c) Active + batch job that uses batch adaptive scheduler + any of > > 1., > > > > 2. > > > > > or > > > > > > 3.: Can run with fewer slots than requested (because it can > > complete > > > > the > > > > > > job with a single slot). Can ask for more slots. Once new slots > > > arrive > > > > it > > > > > > will make use of it. > > > > > > b) Standalone + batch job that uses batch adaptive scheduler + > any > > of > > > > 1., > > > > > > 2. or 3.: Can run with fewer slots than requested (because it can > > > > > complete > > > > > > the job with a single slot). Once new slots arrive it will make > use > > > of > > > > it > > > > > > (up to the desired maximum parallelism). > > > > > > > > > > > > If we decide to go with option 1. or 2., then we will only be > able > > to > > > > run > > > > > > mixed workloads (mixture of bounded and unbounded sources) in > > > streaming > > > > > > mode. This might be ok for the time being. > > > > > > > > > > > > This actually leads to my main concern, which is to give our > users > > > > > > consistent and somewhat easy to understand deployment options. In > > > order > > > > > to > > > > > > achieve this Flink should always be able to make progress unless > > the > > > > > > parallelism is explicitly configured (e.g. a very high > parallelism > > > in a > > > > > > pipelined region that cannot be fulfilled). Moreover, Flink > should > > be > > > > > able > > > > > > to make use of new resources if the job isn't being run at the > > > maximum > > > > > > parallelism already. Removing slots so that the minimum number of > > > > > required > > > > > > slots is still available should also be possible. Maybe one idea > > > could > > > > be > > > > > > to make the adaptive batch scheduler the default for batch jobs > > > > > eventually. > > > > > > For streaming jobs, we would ideally always use the > > AdaptiveScheduler > > > > > > to give a consistent behaviour. > > > > > > > > > > > > As a side note: Creating as many subpartitions as the maximum > > > > parallelism > > > > > > is will result in a one-to-one mapping between sub partitions and > > key > > > > > > groups. If we then also make the non keyed operators work on a > set > > of > > > > sub > > > > > > partitions that store the operator state, then the sub partitions > > > could > > > > > be > > > > > > seen as some logical work unit/split that is assigned to > operators. > > > > > Having > > > > > > such an abstraction could allow us to track which work unit has > > > > completed > > > > > > which helps with rescaling of operators and maintaining order > > > > guarantees, > > > > > > for example. > > > > > > > > > > > > I also left some smaller comments in the wiki. > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang < > > wangdachui9...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to > Flink: > > > > > adaptive > > > > > > > batch job scheduler. The new scheduler can automatically decide > > > > > > > parallelisms of job vertices for batch jobs, according to the > > size > > > of > > > > > > data > > > > > > > volume each vertex needs to process. > > > > > > > > > > > > > > Major benefits of this scheduler includes: > > > > > > > > > > > > > > 1. Batch job users can be relieved from parallelism tuning > > > > > > > 2. Automatically tuned parallelisms can be vertex level and > > can > > > > > better > > > > > > > fit consumed datasets which have a varying volume size every > > day > > > > > > > > > > > > > > > > > > > > > 1. Vertices from SQL batch jobs can be assigned with > different > > > > > > > parallelisms which are automatically tuned > > > > > > > 2. It can be the first step towards enabling > auto-rebalancing > > > > > > workloads > > > > > > > of tasks > > > > > > > > > > > > > > You can find more details in the FLIP-187[1]. Looking forward > to > > > your > > > > > > > feedback. > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Lijie > > > > > > > > > > > > > > > > > > > > > > > > > > > >