Hi all,

Thanks for all the comments on this FLIP.

Based on the discussion in the mailing list and comments in the wiki, I
updated the FLIP doc, the mainly changes include:

1. Added the limitation that currently only supports ALL-EDGES-BLOCKING
batch jobs*.*

2. Adopted some suggestions in the wiki, including adjustments to the
implementation plan, etc.

Best,

Lijie

Till Rohrmann <trohrm...@apache.org> 于2021年11月3日周三 下午4:58写道:

> I have to admit that I cannot think of a better name for the adaptive batch
> scheduler atm. Maybe it is good enough to call the two schedulers
> AdaptiveBatchScheduler and AdaptiveStreamingScheduler to tell which
> scheduler is used for which execution mode. It is true, though, that the
> former is adaptive wrt the workload and the latter wrt the available
> resources.
>
> Cheers,
> Till
>
> On Wed, Nov 3, 2021 at 9:08 AM Lijie Wang <wangdachui9...@gmail.com>
> wrote:
>
> > Hi David,
> >
> >
> > Thanks for your comments.
> >
> >
> > I personally think that "Adaptive" means: Flink automatically determines
> > the appropriate scheduling and execution plan based on some information.
> > The information can include both resource information and workload
> > information, rather than being limited to a certain dimension.
> >
> >
> >
> > For the adaptive scheduler, it is currently resource-based, but in the
> > future I think it can combine the workload (auto-scaling).
> >
> >
> > For the batch adaptive scheduler, it's currently workload-based, but as
> > Till & Zhu mentioned, our ultimate goal will also consider resources.
> >
> > Short summary, the "Adaptive" can include multiple dimensions. Currently,
> > both the adaptive scheduler and batch adaptive scheduler only focus on
> one
> > dimension, and we hope to combine more dimensions in order to give a
> better
> > scheduling and execution plan in the future.
> >
> >
> > At present, in order not to confuse users, it is necessary to indicate
> the
> > information different schedulers based on in the document.
> >
> >
> > Best,
> >
> > Lijie
> >
> >
> >
> > David Morávek <d...@apache.org> 于2021年11月2日周二 下午9:50写道:
> >
> > > Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks
> > > pretty solid and it will be a really great improvement to the batch
> > > scheduling. I'd second to the Till's feedback, especially when it comes
> > to
> > > the consistent behavior between different deployment types /
> schedulers.
> > >
> > > What I'm bit unsure about is the naming here. The word *Adaptive* means
> > > something different in the streaming and batch scheduler:
> > > - For *streaming* it refers to the ability to adapt the job parallelism
> > > based on the resource availability.
> > > - For *batch* it refers to the ability to adapt the stage parallelism
> > based
> > > on the output of the previous stage.
> > >
> > > Should this be a concern?
> > >
> > > Best,
> > > D.
> > >
> > >
> > >
> > > On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com>
> > > wrote:
> > >
> > > > Hi, Till & Zhu
> > > >
> > > > Thanks for your feedback. Also thanks for your comments and
> suggestions
> > > >
> > > > on wiki, which are very helpful for perfecting the FLIP.
> > > >
> > > >
> > > > I also agree to provide our users with consistent and
> > easy-to-understand
> > > >
> > > > deployment options. Regarding the three options proposed by Till, my
> > > > opinion
> > > >
> > > > is the same as Zhu's. In the first version, we can only support
> > > "option1",
> > > > and
> > > > then the
> > > >
> > > > "option2" and "option3" can be the future improvements.
> > > >
> > > > Regarding the side note to abstract subpartitions as splits,
> > > >
> > > > although it is not our original intention, I personally feel it's
> > > > meaningful.
> > > >
> > > > This is also helpful to users, users can use it to do some monitoring
> > > work,
> > > >
> > > > in order to get the progress of jobs in detail.
> > > >
> > > > Best,
> > > >
> > > > Lijie
> > > >
> > > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道:
> > > >
> > > > > Hi Till,
> > > > >
> > > > > Thanks for the comments!
> > > > >
> > > > > I agree with you that we should avoid an auto-scaled job not able
> to
> > be
> > > > > scheduled
> > > > > in standalone/reactive mode. And I think it's great if we can
> expose
> > a
> > > > > deployment
> > > > > option that is consistent for streaming and batch jobs, which can
> be
> > > > easier
> > > > > to
> > > > > understand. Just looking to the day to make both adaptive
> schedulers
> > > > > default, so
> > > > > that most users do not need to care about job tuning while the job
> > can
> > > > run
> > > > > well.
> > > > >
> > > > > Regarding the three options, personally I prefer to take *#1* as
> the
> > > > first
> > > > > step, to
> > > > > limit the scope of this FLIP a bit, otherwise it may be too
> > > complicated.
> > > > > I think *#3* is the final goal we need to target later, so that
> mixed
> > > > > bounded and
> > > > > unbounded workloads can be supported. Given that there can be
> > multiple
> > > > > stages scheduled at the same time, the design of the scheduling may
> > not
> > > > be
> > > > > very straightforward and needs some thorough consideration.
> > > > > *#2* can be a very good improvement itself. Shuffles of batch jobs
> > can
> > > be
> > > > > auto-determined to be pipelined or blocking according to available
> > > > > resources.
> > > > > But the changes may involve many components and can be large. So I
> > > think
> > > > > it can be a standalone future improvement.
> > > > >
> > > > > Regarding the side note to abstract subpartitions as splits, the
> idea
> > > is
> > > > > very
> > > > > interesting to me. Besides supporting auto scaling, I think
> trackable
> > > > > produced
> > > > > splits can also help in troubleshooting and give some insights for
> > > future
> > > > > improvements. Collecting data sizes for batch adaptive scheduler
> can
> > be
> > > > the
> > > > > first step and we can further consider the abstraction of it.
> > > > >
> > > > > Thanks,
> > > > > Zhu
> > > > >
> > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道:
> > > > >
> > > > > > Hi Lijie,
> > > > > >
> > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-)
> > > > > >
> > > > > > I like the idea of making the parallelism of operators of a
> bounded
> > > job
> > > > > > dependent on the data size. This makes the job adjust
> automatically
> > > > when
> > > > > > the data sources/sizes change.
> > > > > >
> > > > > > I can see this work well in combination with the active mode
> where
> > > > Flink
> > > > > > can ask for more resources.
> > > > > >
> > > > > > In the case of the standalone mode, I think it can lead to
> > situations
> > > > > where
> > > > > > one and the same job can be scheduled or not depending on the
> input
> > > > data.
> > > > > > The problem is pipelined regions that contain more than a single
> > > > operator
> > > > > > instance (e.g. pipelined shuffles). We already have this problem
> > when
> > > > > > submitting a batch job with too high parallelism onto a
> standalone
> > > > > cluster.
> > > > > > However, with the adaptive batch mode this problem might become a
> > bit
> > > > > more
> > > > > > present. So my question would be how can we solve this problem
> > > > > (potentially
> > > > > > in a follow up step). I could think of the following three
> > > alternatives
> > > > > > atm:
> > > > > >
> > > > > > 1. Only allow blocking data exchanges: This will limit the size
> of
> > a
> > > > > > pipelined region to a single operator instance. This has the
> > downside
> > > > > that
> > > > > > we no longer support pipelined execution of multiple operators
> > (other
> > > > > than
> > > > > > chained). Moreover, it requires the user to set all data
> exchanges
> > to
> > > > > > blocking which cannot be enforced atm.
> > > > > > 2. Introduce a new pipelined-blocking data exchange hybrid that
> > > > supports
> > > > > > pipelined data exchanges but can also spill to disk if there is
> no
> > > > > > consumer: This could allow to still make progress in case that
> one
> > > has
> > > > a
> > > > > > pipelined region which requires more slots than what we currently
> > > have.
> > > > > > 3. Decide on the actual parallelism of a pipelined region after
> > > having
> > > > > > received the slots that are declared based on the data size per
> > > > subtask.
> > > > > If
> > > > > > the pipelined region contains an all-to-all connection, then the
> > > > > > parallelism is how many slots we currently have. If not, then the
> > > > > > parallelism can be decided by the data volume: This would
> > effectively
> > > > > mean
> > > > > > to enable the existing AdaptiveScheduler to also run batch
> > workloads.
> > > > > >
> > > > > > With either of these options, I believe that we could provide a
> > > > somewhat
> > > > > > consistent behaviour across the different deployment and
> execution
> > > > modes
> > > > > > wrt to scaling:
> > > > > >
> > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run
> with
> > > > fewer
> > > > > > slots than requested. Can ask for more slots. Once new slots
> arrive
> > > it
> > > > > will
> > > > > > make use of it.
> > > > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run
> > with
> > > > > fewer
> > > > > > slots than requested. Once new slots arrive it will make use of
> it.
> > > > > > c) Active + batch job that uses batch adaptive scheduler + any of
> > 1.,
> > > > 2.
> > > > > or
> > > > > > 3.: Can run with fewer slots than requested (because it can
> > complete
> > > > the
> > > > > > job with a single slot). Can ask for more slots. Once new slots
> > > arrive
> > > > it
> > > > > > will make use of it.
> > > > > > b) Standalone + batch job that uses batch adaptive scheduler +
> any
> > of
> > > > 1.,
> > > > > > 2. or 3.: Can run with fewer slots than requested (because it can
> > > > > complete
> > > > > > the job with a single slot). Once new slots arrive it will make
> use
> > > of
> > > > it
> > > > > > (up to the desired maximum parallelism).
> > > > > >
> > > > > > If we decide to go with option 1. or 2., then we will only be
> able
> > to
> > > > run
> > > > > > mixed workloads (mixture of bounded and unbounded sources) in
> > > streaming
> > > > > > mode. This might be ok for the time being.
> > > > > >
> > > > > > This actually leads to my main concern, which is to give our
> users
> > > > > > consistent and somewhat easy to understand deployment options. In
> > > order
> > > > > to
> > > > > > achieve this Flink should always be able to make progress unless
> > the
> > > > > > parallelism is explicitly configured (e.g. a very high
> parallelism
> > > in a
> > > > > > pipelined region that cannot be fulfilled). Moreover, Flink
> should
> > be
> > > > > able
> > > > > > to make use of new resources if the job isn't being run at the
> > > maximum
> > > > > > parallelism already. Removing slots so that the minimum number of
> > > > > required
> > > > > > slots is still available should also be possible. Maybe one idea
> > > could
> > > > be
> > > > > > to make the adaptive batch scheduler the default for batch jobs
> > > > > eventually.
> > > > > > For streaming jobs, we would ideally always use the
> > AdaptiveScheduler
> > > > > > to give a consistent behaviour.
> > > > > >
> > > > > > As a side note: Creating as many subpartitions as the maximum
> > > > parallelism
> > > > > > is will result in a one-to-one mapping between sub partitions and
> > key
> > > > > > groups. If we then also make the non keyed operators work on a
> set
> > of
> > > > sub
> > > > > > partitions that store the operator state, then the sub partitions
> > > could
> > > > > be
> > > > > > seen as some logical work unit/split that is assigned to
> operators.
> > > > > Having
> > > > > > such an abstraction could allow us to track which work unit has
> > > > completed
> > > > > > which helps with rescaling of operators and maintaining order
> > > > guarantees,
> > > > > > for example.
> > > > > >
> > > > > > I also left some smaller comments in the wiki.
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <
> > wangdachui9...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > >
> > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to
> Flink:
> > > > > adaptive
> > > > > > > batch job scheduler. The new scheduler can automatically decide
> > > > > > > parallelisms of job vertices for batch jobs, according to the
> > size
> > > of
> > > > > > data
> > > > > > > volume each vertex needs to process.
> > > > > > >
> > > > > > > Major benefits of this scheduler includes:
> > > > > > >
> > > > > > >    1. Batch job users can be relieved from parallelism tuning
> > > > > > >    2. Automatically tuned parallelisms can be vertex level and
> > can
> > > > > better
> > > > > > >    fit consumed datasets which have a varying volume size every
> > day
> > > > > > >
> > > > > > >
> > > > > > >    1. Vertices from SQL batch jobs can be assigned with
> different
> > > > > > >    parallelisms which are automatically tuned
> > > > > > >    2. It can be the first step towards enabling
> auto-rebalancing
> > > > > > workloads
> > > > > > >    of tasks
> > > > > > >
> > > > > > > You can find more details in the FLIP-187[1]. Looking forward
> to
> > > your
> > > > > > > feedback.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Lijie
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to