Hi all,

So far, I think we have reached an agreement on this FLIP. I have started
the voting thread [1]. Please cast your vote there or ask additional
questions here.

[1] https://lists.apache.org/thread/58yyxvnygw7oy5t556g8rm9y2xzb1l66

Best,
Lijie

Lijie Wang <wangdachui9...@gmail.com> 于2021年11月5日周五 下午4:04写道:

> Hi all,
>
> Thanks for all the comments on this FLIP.
>
> Based on the discussion in the mailing list and comments in the wiki, I
> updated the FLIP doc, the mainly changes include:
>
> 1. Added the limitation that currently only supports ALL-EDGES-BLOCKING
> batch jobs*.*
>
> 2. Adopted some suggestions in the wiki, including adjustments to the
> implementation plan, etc.
>
> Best,
>
> Lijie
>
> Till Rohrmann <trohrm...@apache.org> 于2021年11月3日周三 下午4:58写道:
>
>> I have to admit that I cannot think of a better name for the adaptive
>> batch
>> scheduler atm. Maybe it is good enough to call the two schedulers
>> AdaptiveBatchScheduler and AdaptiveStreamingScheduler to tell which
>> scheduler is used for which execution mode. It is true, though, that the
>> former is adaptive wrt the workload and the latter wrt the available
>> resources.
>>
>> Cheers,
>> Till
>>
>> On Wed, Nov 3, 2021 at 9:08 AM Lijie Wang <wangdachui9...@gmail.com>
>> wrote:
>>
>> > Hi David,
>> >
>> >
>> > Thanks for your comments.
>> >
>> >
>> > I personally think that "Adaptive" means: Flink automatically determines
>> > the appropriate scheduling and execution plan based on some information.
>> > The information can include both resource information and workload
>> > information, rather than being limited to a certain dimension.
>> >
>> >
>> >
>> > For the adaptive scheduler, it is currently resource-based, but in the
>> > future I think it can combine the workload (auto-scaling).
>> >
>> >
>> > For the batch adaptive scheduler, it's currently workload-based, but as
>> > Till & Zhu mentioned, our ultimate goal will also consider resources.
>> >
>> > Short summary, the "Adaptive" can include multiple dimensions.
>> Currently,
>> > both the adaptive scheduler and batch adaptive scheduler only focus on
>> one
>> > dimension, and we hope to combine more dimensions in order to give a
>> better
>> > scheduling and execution plan in the future.
>> >
>> >
>> > At present, in order not to confuse users, it is necessary to indicate
>> the
>> > information different schedulers based on in the document.
>> >
>> >
>> > Best,
>> >
>> > Lijie
>> >
>> >
>> >
>> > David Morávek <d...@apache.org> 于2021年11月2日周二 下午9:50写道:
>> >
>> > > Hi, thanks for drafting the FLIP, Lijie and Zhu Zhu. It already looks
>> > > pretty solid and it will be a really great improvement to the batch
>> > > scheduling. I'd second to the Till's feedback, especially when it
>> comes
>> > to
>> > > the consistent behavior between different deployment types /
>> schedulers.
>> > >
>> > > What I'm bit unsure about is the naming here. The word *Adaptive*
>> means
>> > > something different in the streaming and batch scheduler:
>> > > - For *streaming* it refers to the ability to adapt the job
>> parallelism
>> > > based on the resource availability.
>> > > - For *batch* it refers to the ability to adapt the stage parallelism
>> > based
>> > > on the output of the previous stage.
>> > >
>> > > Should this be a concern?
>> > >
>> > > Best,
>> > > D.
>> > >
>> > >
>> > >
>> > > On Sun, Oct 31, 2021 at 8:21 AM Lijie Wang <wangdachui9...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi, Till & Zhu
>> > > >
>> > > > Thanks for your feedback. Also thanks for your comments and
>> suggestions
>> > > >
>> > > > on wiki, which are very helpful for perfecting the FLIP.
>> > > >
>> > > >
>> > > > I also agree to provide our users with consistent and
>> > easy-to-understand
>> > > >
>> > > > deployment options. Regarding the three options proposed by Till, my
>> > > > opinion
>> > > >
>> > > > is the same as Zhu's. In the first version, we can only support
>> > > "option1",
>> > > > and
>> > > > then the
>> > > >
>> > > > "option2" and "option3" can be the future improvements.
>> > > >
>> > > > Regarding the side note to abstract subpartitions as splits,
>> > > >
>> > > > although it is not our original intention, I personally feel it's
>> > > > meaningful.
>> > > >
>> > > > This is also helpful to users, users can use it to do some
>> monitoring
>> > > work,
>> > > >
>> > > > in order to get the progress of jobs in detail.
>> > > >
>> > > > Best,
>> > > >
>> > > > Lijie
>> > > >
>> > > > Zhu Zhu <reed...@gmail.com> 于2021年10月30日周六 下午3:47写道:
>> > > >
>> > > > > Hi Till,
>> > > > >
>> > > > > Thanks for the comments!
>> > > > >
>> > > > > I agree with you that we should avoid an auto-scaled job not able
>> to
>> > be
>> > > > > scheduled
>> > > > > in standalone/reactive mode. And I think it's great if we can
>> expose
>> > a
>> > > > > deployment
>> > > > > option that is consistent for streaming and batch jobs, which can
>> be
>> > > > easier
>> > > > > to
>> > > > > understand. Just looking to the day to make both adaptive
>> schedulers
>> > > > > default, so
>> > > > > that most users do not need to care about job tuning while the job
>> > can
>> > > > run
>> > > > > well.
>> > > > >
>> > > > > Regarding the three options, personally I prefer to take *#1* as
>> the
>> > > > first
>> > > > > step, to
>> > > > > limit the scope of this FLIP a bit, otherwise it may be too
>> > > complicated.
>> > > > > I think *#3* is the final goal we need to target later, so that
>> mixed
>> > > > > bounded and
>> > > > > unbounded workloads can be supported. Given that there can be
>> > multiple
>> > > > > stages scheduled at the same time, the design of the scheduling
>> may
>> > not
>> > > > be
>> > > > > very straightforward and needs some thorough consideration.
>> > > > > *#2* can be a very good improvement itself. Shuffles of batch jobs
>> > can
>> > > be
>> > > > > auto-determined to be pipelined or blocking according to available
>> > > > > resources.
>> > > > > But the changes may involve many components and can be large. So I
>> > > think
>> > > > > it can be a standalone future improvement.
>> > > > >
>> > > > > Regarding the side note to abstract subpartitions as splits, the
>> idea
>> > > is
>> > > > > very
>> > > > > interesting to me. Besides supporting auto scaling, I think
>> trackable
>> > > > > produced
>> > > > > splits can also help in troubleshooting and give some insights for
>> > > future
>> > > > > improvements. Collecting data sizes for batch adaptive scheduler
>> can
>> > be
>> > > > the
>> > > > > first step and we can further consider the abstraction of it.
>> > > > >
>> > > > > Thanks,
>> > > > > Zhu
>> > > > >
>> > > > > Till Rohrmann <trohrm...@apache.org> 于2021年10月29日周五 下午10:47写道:
>> > > > >
>> > > > > > Hi Lijie,
>> > > > > >
>> > > > > > Thanks for drafting this FLIP together with Zhu Zhu :-)
>> > > > > >
>> > > > > > I like the idea of making the parallelism of operators of a
>> bounded
>> > > job
>> > > > > > dependent on the data size. This makes the job adjust
>> automatically
>> > > > when
>> > > > > > the data sources/sizes change.
>> > > > > >
>> > > > > > I can see this work well in combination with the active mode
>> where
>> > > > Flink
>> > > > > > can ask for more resources.
>> > > > > >
>> > > > > > In the case of the standalone mode, I think it can lead to
>> > situations
>> > > > > where
>> > > > > > one and the same job can be scheduled or not depending on the
>> input
>> > > > data.
>> > > > > > The problem is pipelined regions that contain more than a single
>> > > > operator
>> > > > > > instance (e.g. pipelined shuffles). We already have this problem
>> > when
>> > > > > > submitting a batch job with too high parallelism onto a
>> standalone
>> > > > > cluster.
>> > > > > > However, with the adaptive batch mode this problem might become
>> a
>> > bit
>> > > > > more
>> > > > > > present. So my question would be how can we solve this problem
>> > > > > (potentially
>> > > > > > in a follow up step). I could think of the following three
>> > > alternatives
>> > > > > > atm:
>> > > > > >
>> > > > > > 1. Only allow blocking data exchanges: This will limit the size
>> of
>> > a
>> > > > > > pipelined region to a single operator instance. This has the
>> > downside
>> > > > > that
>> > > > > > we no longer support pipelined execution of multiple operators
>> > (other
>> > > > > than
>> > > > > > chained). Moreover, it requires the user to set all data
>> exchanges
>> > to
>> > > > > > blocking which cannot be enforced atm.
>> > > > > > 2. Introduce a new pipelined-blocking data exchange hybrid that
>> > > > supports
>> > > > > > pipelined data exchanges but can also spill to disk if there is
>> no
>> > > > > > consumer: This could allow to still make progress in case that
>> one
>> > > has
>> > > > a
>> > > > > > pipelined region which requires more slots than what we
>> currently
>> > > have.
>> > > > > > 3. Decide on the actual parallelism of a pipelined region after
>> > > having
>> > > > > > received the slots that are declared based on the data size per
>> > > > subtask.
>> > > > > If
>> > > > > > the pipelined region contains an all-to-all connection, then the
>> > > > > > parallelism is how many slots we currently have. If not, then
>> the
>> > > > > > parallelism can be decided by the data volume: This would
>> > effectively
>> > > > > mean
>> > > > > > to enable the existing AdaptiveScheduler to also run batch
>> > workloads.
>> > > > > >
>> > > > > > With either of these options, I believe that we could provide a
>> > > > somewhat
>> > > > > > consistent behaviour across the different deployment and
>> execution
>> > > > modes
>> > > > > > wrt to scaling:
>> > > > > >
>> > > > > > a) Active + streaming job that uses AdaptiveScheduler: Can run
>> with
>> > > > fewer
>> > > > > > slots than requested. Can ask for more slots. Once new slots
>> arrive
>> > > it
>> > > > > will
>> > > > > > make use of it.
>> > > > > > b) Reactive + streaming job that uses AdaptiveScheduler: Can run
>> > with
>> > > > > fewer
>> > > > > > slots than requested. Once new slots arrive it will make use of
>> it.
>> > > > > > c) Active + batch job that uses batch adaptive scheduler + any
>> of
>> > 1.,
>> > > > 2.
>> > > > > or
>> > > > > > 3.: Can run with fewer slots than requested (because it can
>> > complete
>> > > > the
>> > > > > > job with a single slot). Can ask for more slots. Once new slots
>> > > arrive
>> > > > it
>> > > > > > will make use of it.
>> > > > > > b) Standalone + batch job that uses batch adaptive scheduler +
>> any
>> > of
>> > > > 1.,
>> > > > > > 2. or 3.: Can run with fewer slots than requested (because it
>> can
>> > > > > complete
>> > > > > > the job with a single slot). Once new slots arrive it will make
>> use
>> > > of
>> > > > it
>> > > > > > (up to the desired maximum parallelism).
>> > > > > >
>> > > > > > If we decide to go with option 1. or 2., then we will only be
>> able
>> > to
>> > > > run
>> > > > > > mixed workloads (mixture of bounded and unbounded sources) in
>> > > streaming
>> > > > > > mode. This might be ok for the time being.
>> > > > > >
>> > > > > > This actually leads to my main concern, which is to give our
>> users
>> > > > > > consistent and somewhat easy to understand deployment options.
>> In
>> > > order
>> > > > > to
>> > > > > > achieve this Flink should always be able to make progress unless
>> > the
>> > > > > > parallelism is explicitly configured (e.g. a very high
>> parallelism
>> > > in a
>> > > > > > pipelined region that cannot be fulfilled). Moreover, Flink
>> should
>> > be
>> > > > > able
>> > > > > > to make use of new resources if the job isn't being run at the
>> > > maximum
>> > > > > > parallelism already. Removing slots so that the minimum number
>> of
>> > > > > required
>> > > > > > slots is still available should also be possible. Maybe one idea
>> > > could
>> > > > be
>> > > > > > to make the adaptive batch scheduler the default for batch jobs
>> > > > > eventually.
>> > > > > > For streaming jobs, we would ideally always use the
>> > AdaptiveScheduler
>> > > > > > to give a consistent behaviour.
>> > > > > >
>> > > > > > As a side note: Creating as many subpartitions as the maximum
>> > > > parallelism
>> > > > > > is will result in a one-to-one mapping between sub partitions
>> and
>> > key
>> > > > > > groups. If we then also make the non keyed operators work on a
>> set
>> > of
>> > > > sub
>> > > > > > partitions that store the operator state, then the sub
>> partitions
>> > > could
>> > > > > be
>> > > > > > seen as some logical work unit/split that is assigned to
>> operators.
>> > > > > Having
>> > > > > > such an abstraction could allow us to track which work unit has
>> > > > completed
>> > > > > > which helps with rescaling of operators and maintaining order
>> > > > guarantees,
>> > > > > > for example.
>> > > > > >
>> > > > > > I also left some smaller comments in the wiki.
>> > > > > >
>> > > > > > Cheers,
>> > > > > > Till
>> > > > > >
>> > > > > > On Wed, Oct 20, 2021 at 8:52 AM Lijie Wang <
>> > wangdachui9...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > >
>> > > > > > > Zhu Zhu and I propose to introduce a new job scheduler to
>> Flink:
>> > > > > adaptive
>> > > > > > > batch job scheduler. The new scheduler can automatically
>> decide
>> > > > > > > parallelisms of job vertices for batch jobs, according to the
>> > size
>> > > of
>> > > > > > data
>> > > > > > > volume each vertex needs to process.
>> > > > > > >
>> > > > > > > Major benefits of this scheduler includes:
>> > > > > > >
>> > > > > > >    1. Batch job users can be relieved from parallelism tuning
>> > > > > > >    2. Automatically tuned parallelisms can be vertex level and
>> > can
>> > > > > better
>> > > > > > >    fit consumed datasets which have a varying volume size
>> every
>> > day
>> > > > > > >
>> > > > > > >
>> > > > > > >    1. Vertices from SQL batch jobs can be assigned with
>> different
>> > > > > > >    parallelisms which are automatically tuned
>> > > > > > >    2. It can be the first step towards enabling
>> auto-rebalancing
>> > > > > > workloads
>> > > > > > >    of tasks
>> > > > > > >
>> > > > > > > You can find more details in the FLIP-187[1]. Looking forward
>> to
>> > > your
>> > > > > > > feedback.
>> > > > > > >
>> > > > > > > [1]
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>> > > > > > >
>> > > > > > > Best,
>> > > > > > >
>> > > > > > > Lijie
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to