Thanks Chesnay&Till for proposing this improvement.
It's of good value to allow jobs to make best use of available
resources adaptively. Not
to mention it further supports reactive mode.
So big +1 for it.
I have a minor concern about possible regression in certain cases
due to the proposed
JobVertex-wise scheduling which replaces current
ExecutionVertex-wise scheduling.
In the proposal, looks to me it requires a stage to finish before
its consumer stage can be
scheduled. This limitation, however, does not exist in current
scheduler. In the case that there
exists a POINTWISE BLOCKING edge, the downstream execution region
can be scheduled
right after its connected upstream execution vertices finishes,
even before the whole upstream
stage finishes. This allows the region to be launched earlier and
make use of available resources.
Do we need to let the new scheduler retain this property?
Thanks,
Zhu
Xintong Song <tonysong...@gmail.com
<mailto:tonysong...@gmail.com>> 于2020年8月26日周三 下午6:59写道:
Thanks for the quick response.
*Job prioritization, Allocation IDs, Minimum resource
requirements, SlotManager Implementation Plan:* Sounds good
to me.
*FLIP-56*
Good point about the trade-off. I believe maximum resource
utilization and
quick deployment are desired in different scenarios. E.g., a
long running
streaming job deserves some deployment latency to improve the
resource
utilization, which benefits the entire lifecycle of the job.
On the other
hand, short batch queries may prefer quick deployment,
otherwise the time
for resource allocation might significantly increase the
response time.
It would be good enough for me to bring these questions to
attention.
Nothing that I'm aware of should block this FLIP.
Thank you~
Xintong Song
On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
> Thank you Xintong for your questions!
> Job prioritization
> Yes, the job which declares it's initial requirements first
is prioritized.
> This is very much for simplicity; for example this avoids
the nasty case
> where all jobs get some resources, but none get enough to
actually run the
> job.
> Minimum resource requirements
>
> My bad; at some point we want to allow the JobMaster to
declare a range of
> resources it could use to run a job, for example min=1,
target=10,
> max=+inf.
>
> With this model, the RM would then try to balance the
resources such that
> as many jobs as possible are as close to the target state
as possible.
>
> Currently, the minimum/target/maximum resources are all the
same. So the
> notification is sent whenever the current requirements
cannot be met.
> Allocation IDs
> We do intend to, at the very least, remove AllocationIDs on the
> SlotManager side, as they are just not required there.
>
> On the slotpool side we have to keep them around at least
until the
> existing Slotpool implementations are removed (not sure
whether we'll fully
> commit to this in 1.12), since the interfaces use
AllocationIDs, which also
> bleed into the JobMaster.
> The TaskExecutor is in a similar position.
> But in the long-term, yes they will be removed, and most
usages will
> probably be replaced by the SlotID.
> FLIP-56
>
> Dynamic slot allocations are indeed quite interesting and
raise a few
> questions; for example, the main purpose of it is to ensure
maximum
> resource utilization. In that case, should the JobMaster be
allowed to
> re-use a slot it if the task requires less resources than
the slot
> provides, or should it always request a new slot that
exactly matches?
>
> There is a trade-off to be made between maximum resource
utilization
> (request exactly matching slots, and only re-use exact
matches) and quicker
> job deployment (re-use slot even if they don't exactly
match, skip
> round-trip to RM).
>
> As for how to handle the lack of a preemptively known
SlotIDs, that should
> be fine in and of itself; we already handle a similar case
when we request
> a new TaskExecutor to be started. So long as there is some
way to know how
> many resources the TaskExecutor has in total I do not see a
problem at the
> moment. We will get the SlotID eventually by virtue of the
heartbeat
> SlotReport.
> Implementation plan (SlotManager)
> You are on the right track. The SlotManager tracks the
declared resource
> requirements, and if the requirements increased it creates
a SlotRequest,
> which then goes through similar code paths as we have at
the moment (try to
> find a free slot, if found tell the TM, otherwise try to
request new TM).
> The SlotManager changes are not that substantial to get a
working version;
> we have a PoC and most of the work went into refactoring
the SlotManager
> into a more manageable state. (split into several
components, stricter and
> simplified Slot life-cycle, ...).
> Offer/free slots between JM/TM
> Gotta run, but that's a good question and I'll think about.
But I think it
> comes down to making less changes, and being able to
leverage existing
> reconciliation protocols.
> Do note that TaskExecutor also explicitly inform the RM
about freed slots;
> the heartbeat slot report is just a safety net.
> I'm not sure whether slot requests are able to overtake a
slot release;
> @till do you have thoughts on that?
> As for the race condition between the requirements
reduction and slot
> release, if we run into problems we have the backup plan of
only releasing
> the slot after the requirement reduction has been acknowledged.
>
> On 26/08/2020 10:31, Xintong Song wrote:
>
> Thanks for preparing the FLIP and driving this discussion,
@Chesnay & @Till.
>
> I really like the idea. I see a great value in the proposed
declarative
> resource management, in terms of flexibility, usability and
efficiency.
>
> I have a few comments and questions regarding the FLIP
design. In general,
> the protocol design makes good sense to me. My main concern
is that it is
> not very clear to me what changes are required from the
> Resource/SlotManager side to adapt to the new protocol.
>
> *1. Distributed slots across different jobs*
>
> Jobs which register their requirements first, will have
precedence over
>
> other jobs also if the requirements change during the runtime.
>
> Just trying to understand, does this mean jobs are
prioritized by the order
> of their first resource declaring?
>
> *2. AllocationID*
>
> Is this FLIP suggesting to completely remove AllocationID?
>
> I'm fine with this change. It seems where AllocationID is
used can either
> be removed or be replaced by JobID. This reflects the
concept that slots
> are now assigned to a job instead of its individual slot
requests.
>
> I would like to bring to attention that this also requires
changes on the
> TM side, with respect to FLIP-56[1].
>
> In the context of dynamic slot allocation introduced by
FLIP-56, slots do
> not pre-exist on TM and are dynamically created when RM calls
> TaskExecutorGateway.requestSlot. Since the slots do not
pre-exist, nor
> their SlotIDs, RM requests slots from TM with a special
SlotID (negative
> slot index). The semantic changes from "requesting the slot
identified by
> the given SlotID" to "requesting a slot with the given
resource profile".
> The AllocationID is used for identifying the dynamic slots
in such cases.
>
> >From the perspective of FLIP-56 and fine grained resource
management, I'm
> fine with removing AllocationID. In the meantime, we would
need TM to
> recognize the special negative indexed SlotID and generate
a new unique
> SlotID for identifying the slot.
>
> *3. Minimum resource requirement*
>
> However, we can let the JobMaster know if we cannot fulfill
the minimum
>
> resource requirement for a job after
> resourcemanager.standalone.start-up-time has passed.
>
> What is the "minimum resource requirement for a job"? Did I
overlook
> anything?
>
> *4. Offer/free slots between JM/TM*
>
> This probably deserves a separate discussion thread. Just
want to bring it
> up.
>
> The idea has been coming to me for quite some time. Is this
design, that JM
> requests resources from RM while accepting/releasing
resources from/to TM,
> the right thing?
>
> The pain point is that events of JM's activities
(requesting/releasing
> resources) arrive at RM out of order. This leads to several
problems.
>
> - When a job fails and task cancelation takes long, some
of the slots
> might be released from the slot pool due to being unused
for a while. Then
> the job restarts and requests these slots again. At this
time, RM may
> receive slot requests before noticing from TM heartbeats
that previous
> slots are released, thus requesting new resources. I've
seen many times
> that the Yarn cluster has a heavy load and is not
allocating resources
> quickly enough, which leads to slot request timeout and
job failover, and
> during the failover more resources are requested which
adds more load to
> the Yarn cluster. Happily, this should be improved with
the declarative
> resource management. :)
> - As described in this FLIP, it is possible that RM
learns the releasing
> of slots from TM heartbeat before noticing the resource
requirement
> decreasing, it may allocate more resources which need to
be released soon.
> - It complicates the ResourceManager/SlotManager, by
requiring an
> additional slot state PENDING, which means the slot is
assigned by RM but
> is not confirmed successfully ordered by TM.
>
> Why not just make RM offer the allocated resources (TM
address, SlotID,
> etc.) to JM, and JM release resources to RM? So that for
all the resource
> management JM talks to RM, and for the task deployment and
execution it
> talks to TM?
>
> I tried to understand the benefits for having the current
design, and found
> the following in FLIP-6[2].
>
>
> All that the ResourceManager does is negotiate between the
> cluster-manager, the JobManager, and the TaskManagers. Its
state can hence
> be reconstructed from re-acquiring containers and
re-registration from
> JobManagers and TaskManagers
>
> Correct me if I'm wrong, it seems the original purpose is
to make sure the
> assignment between jobs and slots are confirmed between JM
and TMs, so that
> failures of RM will not lead to any inconsistency. However,
this only
> benefits scenarios where RM fails while JM and TMs live.
Currently, JM and
> RM are in the same process. We do not really have any
scenario where RM
> fails alone. We might separate JM and RM to different
processes in future,
> but as far as I can see we don't have such requirements at
the moment. It
> seems to me that we are suffering the current problems,
complying to
> potential future benefits.
>
> Maybe I overlooked something.
>
> *5. Implementation Plan*
>
> For SlotPool, it sounds quite straightforward to "aggregate
individual slot
> requests".
>
> For Resource/SlotManager, it seems there are quite a lot
changes needed,
> with the removal of individual slot requests and
AllocationID. It's not
> clear to me what is the first step plan for RM/SM? Do we
internally treat
> the resource requirements as individual slot requests as
the first step, so
> only the interfaces are changed? Or do we actually change
(practically
> re-write) the slot allocation logics?
>
> Thank you~
>
> Xintong Song
>
>
>
[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>
[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>>
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
>
>
> Hello,
>
> in FLIP-138 we want to rework the way the JobMaster
acquires slots, such
> that required resources are declared before a job is
scheduled and th
> job execution is adjusted according to the provided
resources (e.g.,
> reducing parallelism), instead of asking for a fixed number
of resources
> during scheduling and failing midway through if not enough
resources are
> available.
>
> This is a stepping stone towards reactive mode, where Flink
will
> automatically make use of new TaskExecutors being started.
>
> More details can be found here
>
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>
> .
>
>
>