Yes, since we're only operating within the scheduler, which exists separately for each job, we don't have to worry about collisions with other jobs.

On 1/27/2021 11:08 AM, Yangze Guo wrote:
Thanks for preparing the FLIP and driving the discussion, Till. All of
my questions have already been answered in the previous discussion.

I just have one minor reminder regarding using ExecutionVertexID as
the identificator. The JobVertexID is generated according to the
topology instead of generated randomly. Thus, it's not guaranteed to
be unique across different jobs and different execution. This
characteristic is also inherited by the ExecutionVertexID. UUIC, the
ParallelismAndResourceAssignments is a job-level concept so it will
work well.

Best,
Yangze Guo

On Wed, Jan 27, 2021 at 10:37 AM Xintong Song <tonysong...@gmail.com> wrote:
Thanks for the explanations, Till.
Keeping the initial design as simple as possible sounds good to me.
There's no further concern from my side.

Thank you~

Xintong Song



On Tue, Jan 26, 2021 at 9:56 PM Zhu Zhu <reed...@gmail.com> wrote:

Thanks Till for the explanation and the follow up actions.
That sounds good to me.

Thanks,
Zhu

Till Rohrmann <trohrm...@apache.org> 于2021年1月26日周二 下午7:28写道:

Thanks a lot for all your feedback. Let me try to answer it.

# ScaleUpController vs. RescaleController

At the moment, the idea of the declarative scheduler is to run a job with
a parallelism which is as close to the target value as possible but to
never exceed it. Since the target value is currently fixed (this will
change once auto-scaling is supported), we never have to scale down. In
fact, scale down events only happen if the system loses slots and then
you
cannot choose whether to scale down or not. This and the idea to keep the
initial design as simple as possible motivated the naming of
ScaleUpController. Once we support auto-scaling, we might have to rename
this interface. However, since I don't fully know how things will look
like
with auto-scaling, I would like to refrain from this change now.

# Relationship between declarative scheduler and existing implementations

The declarative scheduler will implement the SchedulerNG interface. At
the
moment I am not aware of any required changes to this interface. Also the
way the JobMaster interacts with the scheduler won't change to the best
of
my knowledge.

# Awareness of SlotAllocator of the ExecutionGraph

The idea is to make the SlotAllocator not aware of the ExecutionGraph
because it can only be created after the SlotAllocator has decided on the
parallelism the job can be run with. Moreover, the idea is that the
JobInformation instance contains all the required information of a job to
make this decision.

The SlotAllocator also reserves the slots for a job before the
ExecutionGraph is created. Consequently, we need a way to associate the
slots with the Executions of the ExecutionGraph. Here we have decided to
use the ExecutionVertexID as the identificator. We could also introduce a
new identificator as long as we can map this one to the
ExecutionVertexID.
We could for example use JobVertexID and subtask id as the identificator
but this is exactly what the ExecutionVertexID is. That's why we decided
to
reuse it for the time being.

# ScaleUpController

## Cumulative parallelism

Yes, the cumulative parallelism is the sum of all tasks.

## When to call the ScaleUpController

Yes, the idea is that the scheduler will check whether one can run the
job
with an increased parallelism if there are new slots available. If this
is
the case, then we will ask the ScaleUpController, whether we actually
should scale up (e.g. whether the increase is significant enough or
whether
enough time has passed between the last scale up operation).

## Do we provide enough information to the ScaleUpController

I am pretty sure that we don't provide the ScaleUpController enough
information to make the best possible decision. We wanted to keep it
simple
in the first version and iterate on the interface with the help of user
feedback. I think that this interface won't fundamentally change the
scheduler and, hence, it shouldn't block future extensions by starting
with
a simple interface.

# Cluster vs. job configuration

I think you are right that it would be most flexible if one could select
a
scheduler on a per job basis with falling back to the cluster
configuration
if nothing has been specified. For the sake of simplicity and narrowing
down the scope I would consider this as a follow up.

# Performance on failover

I agree that the performance won't be optimal in the failover case
because
of 1) having to recreate the EG and 2) redundant IO operations. For 1) I
agree that FLINK-21110 will help a lot. For 2) moving the IO operations
out
of the EG could be a good improvement. With the same argument as above, I
would consider this a follow up because I would like to keep the initial
design as simple as possible and I think that these performance
optimizations are not required to make this feature work.

# Lost execution history

You are right Zhu Zhu that recreating the ExecutionGraph causes the loss
of information. We are currently investigating which information is
strictly required and needs to be maintained across ExecutionGraph
creations (mainly for tests to succeed). One idea is to either store this
information outside of the ExecutionGraph or to initialize the
ExecutionGraph with the information from the previous instance. Most
likely, we won't give a guarantee that all counters, timestamps and
metrics
are correctly maintained across failovers in the first version, though.
It
is a bit the same argument as above that this is not strictly required to
make this feature work. Maybe this means that this feature is not
production ready for some users, but I think this is ok.

In general, to fully integrate the declarative scheduler with the web ui
we have to be able to display changing ExecutionGraphs. Ideally, we would
have something like a timeline where one can select the time for which to
display the state of the job execution. If one goes all the way to the
right, the system shows the live state and all other positions are the
present time minus some offset.

I will add the follow ups to the FLIP as potential improvements in order
to keep track of them.

Cheers,
Till

On Tue, Jan 26, 2021 at 9:53 AM Zhu Zhu <reed...@gmail.com> wrote:

Thanks for the proposal! @Till Rohrmann <trohrm...@apache.org>

The design looks generally good to me. I have 2 concerns though:

# performance on failover
I can see is that an ExecutionGraph will be built and initialized on
each
task failure. The process is possibly to be slow:
1. the topology building can be very slow (much slower than the
scheduling
or restarting process) when the job scale becomes large (see
FLINK-20612).
Luckily FLINK-21110 will improve it.
2. the input/output format can be slow due to possible IO work to
external services.
Maybe we should extract it out from ExecutionGraph building?

# execution history lost
After building and using a new ExecutionGraph, the execution history in
the
old graph will be lost, including status timestamps of the job, prior
execution
attempts and their failures. Should we let the new EG inherit these
information
from prior EG? Maybe as a future plan with further discussions regarding
the
varying parallelism.

Thanks,
Zhu

Xintong Song <tonysong...@gmail.com> 于2021年1月24日周日 上午11:55写道:

Thanks for preparing the FLIP and starting the discussion, Till.

I have a few questions trying to understand the design.

## What is the relationship between the current and new schedulers?
IIUC, the new declarative scheduler will coexist with the current
scheduler, as an alternative that the user needs to explicitly switch
to.
Then does it require any changes to the scheduler interfaces and how
the
JobMaster interacts with it?

## Is `SlotAllocator` aware of `ExecutionGraph`?
Looking at the interfaces, it seems to me that `SlotAllocator` only
takes
`JobInformation` and `VertexInformation` as topology inputs. However,
it
generates `ParallelismAndResourceAssignments` which maps slots to
`ExecutionVertexID`s. It's a bit confusing that `ExecutionGraph` is
generated outside `SlotAllocator` while `ExecutionVertexID`s are
generated
inside `SlotAllocator`.

## About `ScaleUpController#canScaleUp`

### What is cumulative parallelism?
The interface shows that the cumulative parallelism is a single number
per
job graph. I assume this is the sum of parallelism of all vertices?

### Is this interface expected to be called before or after
`SlotAllocator#determineParallelism`?
IIUC, on new resources appear, the scheduler always calls
`SlotAllocator#determineParallelism` to generate a new plan based on
the
available resources, and `ScaleUpController#canScaleUp` is then called
to
decide whether to apply the new plan, according to whether the
increasement
is significant, how long the job has been running since last restart,
etc.
Is that correct?

### The cumulative parallelism may not be enough for deciding whether
the
job should be scaled up.
I'm assuming my understanding for the above questions are correct.
Please
correct me if not.

I noticed Chesnay's comment on the wiki page about making the decision
based on when the job entered the execution state last time.

In addition to that, there could also be situations where jobs may not
benefit much an increment in cumulative parallelism.
E.g., for a topology A -> B, where A and B are in different slot
sharing
groups, and the current parallelism for both A and B are 2. When 1 new
slot
appears, `SlotAllocator` may suggest increasing parallelism of A to 3.
But
this may not be a significant beneficial change for the job because the
overall performance is still bottlenecked by the parallelism of B.

## Cluster vs. Job configuration
I'm not entirely sure about specifying which scheduler to be used via a
cluster level configuration option. Each job in a Flink cluster has its
own
JobMaster, and which scheduler to use is internal to that JobMaster. I
understand that the declarative scheduler requires the cluster to use
declarative resource management. However, other jobs should still be
allowed to use other scheduler implementations that also support
declarative resource management (e.g. the current `DefaultScheduler`).
Maybe we should consider the cluster level configuration option as a
default scheduler, and allow the job to specify a different scheduler
in
its execution config. This is similar to how we specify which state
backend
to be used.

## Minor: It might be better to also show in the state machine figure
that
it can go from `Executing` to `Restarting` when new resources appear.

Thank you~

Xintong Song



On Sat, Jan 23, 2021 at 6:04 AM Steven Wu <stevenz...@gmail.com>
wrote:
Till, thanks a lot for the proposal.

Even if the initial phase is only to support scale-up, maybe the
"ScaleUpController" interface should be called "RescaleController" so
that
in the future scale-down can be added.

On Fri, Jan 22, 2021 at 7:03 AM Till Rohrmann <trohrm...@apache.org>
wrote:

Hi everyone,

I would like to start a discussion about adding a new type of
scheduler
to
Flink. The declarative scheduler will first declare the required
resources
and wait for them before deciding on the actual parallelism of a
job.
Thereby it can better handle situations where resources cannot be
fully
fulfilled. Moreover, it will act as a building block for the
reactive
mode
where Flink should scale to the maximum of the currently available
resources.

Please find more details in the FLIP wiki document [1]. Looking
forward
to
your feedback.

[1] https://cwiki.apache.org/confluence/x/mwtRCg

Cheers,
Till


Reply via email to