Maybe :)

Imagine a case where the producer and consumer have the same ResourceProfile, or at least one where the consumer requirements are less than the producer ones. In this case, the scheduler can happily schedule consumers, because it knows it will get enough slots.

If the profiles are different, then the Scheduler _may_ wait numberOf(producer) slots; it _may_ also stick with the parallelism and schedule right away, in the worst case running the consumers in sequence. In fact, for batch jobs there is probably(?) never a reason for the scheduler to _reduce_ the parallelism; it can always try to run things in sequence if it doesn't get enough slots. Reducing the parallelism would just mean that you'd have to wait for more producers to finish.

The scope of this FLIP is just the protocol, without changes to the scheduler; in other words just changing how slots are acquired, but change nothing about the scheduling. That is tackled in a follow-up FLIP.

On 28/08/2020 07:34, Zhu Zhu wrote:
Thanks for the response!

>> The scheduler doesn't have to wait for one stage to finish
Does it mean we will declare resources and decide the parallelism for a stage which is partially schedulable, i.e. when input data are ready just for part of the execution vertices?

>> This will get more complicated once we allow the scheduler to change the parallelism while the job is running Agreed. Looks to me it's a problem for batch jobs only and can be avoided for streaming jobs. Will this FLIP limit its scope to streaming jobs, and improvements for batch jobs are to be done later?

Thanks,
Zhu

Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> 于2020年8月28日周五 上午2:27写道:

    The scheduler doesn't have to wait for one stage to finish. It is
    still aware that the upstream execution vertex has finished, and
    can request/use slots accordingly to schedule the consumer.

    This will get more complicated once we allow the scheduler to
    change the parallelism while the job is running, for which we will
    need some enhancements to the network stack to allow the producer
    to run without knowing the consumer parallelism ahead of time. I'm
    not too clear on the details, but we'll some form of keygroup-like
    approach for sub partitions (maxParallelism and all that).


    On 27/08/2020 20:05, Zhu Zhu wrote:
    Thanks Chesnay&Till for proposing this improvement.
    It's of good value to allow jobs to make best use of available
    resources adaptively. Not
    to mention it further supports reactive mode.
    So big +1 for it.

    I have a minor concern about possible regression in certain cases
    due to the proposed
    JobVertex-wise scheduling which replaces current
    ExecutionVertex-wise scheduling.
    In the proposal, looks to me it requires a stage to finish before
    its consumer stage can be
    scheduled. This limitation, however, does not exist in current
    scheduler. In the case that there
    exists a POINTWISE BLOCKING edge, the downstream execution region
    can be scheduled
    right after its connected upstream execution vertices finishes,
    even before the whole upstream
    stage finishes. This allows the region to be launched earlier and
    make use of available resources.
    Do we need to let the new scheduler retain this property?

    Thanks,
    Zhu

    Xintong Song <tonysong...@gmail.com
    <mailto:tonysong...@gmail.com>> 于2020年8月26日周三 下午6:59写道:

        Thanks for the quick response.

        *Job prioritization, Allocation IDs, Minimum resource
        requirements, SlotManager Implementation Plan:* Sounds good
        to me.

        *FLIP-56*
        Good point about the trade-off. I believe maximum resource
        utilization and
        quick deployment are desired in different scenarios. E.g., a
        long running
        streaming job deserves some deployment latency to improve the
        resource
        utilization, which benefits the entire lifecycle of the job.
        On the other
        hand, short batch queries may prefer quick deployment,
        otherwise the time
        for resource allocation might significantly increase the
        response time.
        It would be good enough for me to bring these questions to
        attention.
        Nothing that I'm aware of should block this FLIP.

        Thank you~

        Xintong Song



        On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        > Thank you Xintong for your questions!
        > Job prioritization
        > Yes, the job which declares it's initial requirements first
        is prioritized.
        > This is very much for simplicity; for example this avoids
        the nasty case
        > where all jobs get some resources, but none get enough to
        actually run the
        > job.
        > Minimum resource requirements
        >
        > My bad; at some point we want to allow the JobMaster to
        declare a range of
        > resources it could use to run a job, for example min=1,
        target=10,
        > max=+inf.
        >
        > With this model, the RM would then try to balance the
        resources such that
        > as many jobs as possible are as close to the target state
        as possible.
        >
        > Currently, the minimum/target/maximum resources are all the
        same. So the
        > notification is sent whenever the current requirements
        cannot be met.
        > Allocation IDs
        > We do intend to, at the very least, remove AllocationIDs on the
        > SlotManager side, as they are just not required there.
        >
        > On the slotpool side we have to keep them around at least
        until the
        > existing Slotpool implementations are removed (not sure
        whether we'll fully
        > commit to this in 1.12), since the interfaces use
        AllocationIDs, which also
        > bleed into the JobMaster.
        > The TaskExecutor is in a similar position.
        > But in the long-term, yes they will be removed, and most
        usages will
        > probably be replaced by the SlotID.
        > FLIP-56
        >
        > Dynamic slot allocations are indeed quite interesting and
        raise a few
        > questions; for example, the main purpose of it is to ensure
        maximum
        > resource utilization. In that case, should the JobMaster be
        allowed to
        > re-use a slot it if the task requires less resources than
        the slot
        > provides, or should it always request a new slot that
        exactly matches?
        >
        > There is a trade-off to be made between maximum resource
        utilization
        > (request exactly matching slots, and only re-use exact
        matches) and quicker
        > job deployment (re-use slot even if they don't exactly
        match, skip
        > round-trip to RM).
        >
        > As for how to handle the lack of a preemptively known
        SlotIDs, that should
        > be fine in and of itself; we already handle a similar case
        when we request
        > a new TaskExecutor to be started. So long as there is some
        way to know how
        > many resources the TaskExecutor has in total I do not see a
        problem at the
        > moment. We will get the SlotID eventually by virtue of the
        heartbeat
        > SlotReport.
        > Implementation plan (SlotManager)
        > You are on the right track. The SlotManager tracks the
        declared resource
        > requirements, and if the requirements increased it creates
        a SlotRequest,
        > which then goes through similar code paths as we have at
        the moment (try to
        > find a free slot, if found tell the TM, otherwise try to
        request new TM).
        > The SlotManager changes are not that substantial to get a
        working version;
        > we have a PoC and most of the work went into refactoring
        the SlotManager
        > into a more manageable state. (split into several
        components, stricter and
        > simplified Slot life-cycle, ...).
        > Offer/free slots between JM/TM
        > Gotta run, but that's a good question and I'll think about.
        But I think it
        > comes down to making less changes, and being able to
        leverage existing
        > reconciliation protocols.
        > Do note that TaskExecutor also explicitly inform the RM
        about freed slots;
        > the heartbeat slot report is just a safety net.
        > I'm not sure whether slot requests are able to overtake a
        slot release;
        > @till do you have thoughts on that?
        > As for the race condition between the requirements
        reduction and slot
        > release, if we run into problems we have the backup plan of
        only releasing
        > the slot after the requirement reduction has been acknowledged.
        >
        > On 26/08/2020 10:31, Xintong Song wrote:
        >
        > Thanks for preparing the FLIP and driving this discussion,
        @Chesnay & @Till.
        >
        > I really like the idea. I see a great value in the proposed
        declarative
        > resource management, in terms of flexibility, usability and
        efficiency.
        >
        > I have a few comments and questions regarding the FLIP
        design. In general,
        > the protocol design makes good sense to me. My main concern
        is that it is
        > not very clear to me what changes are required from the
        > Resource/SlotManager side to adapt to the new protocol.
        >
        > *1. Distributed slots across different jobs*
        >
        > Jobs which register their requirements first, will have
        precedence over
        >
        > other jobs also if the requirements change during the runtime.
        >
        > Just trying to understand, does this mean jobs are
        prioritized by the order
        > of their first resource declaring?
        >
        > *2. AllocationID*
        >
        > Is this FLIP suggesting to completely remove AllocationID?
        >
        > I'm fine with this change. It seems where AllocationID is
        used can either
        > be removed or be replaced by JobID. This reflects the
        concept that slots
        > are now assigned to a job instead of its individual slot
        requests.
        >
        > I would like to bring to attention that this also requires
        changes on the
        > TM side, with respect to FLIP-56[1].
        >
        > In the context of dynamic slot allocation introduced by
        FLIP-56, slots do
        > not pre-exist on TM and are dynamically created when RM calls
        > TaskExecutorGateway.requestSlot. Since the slots do not
        pre-exist, nor
        > their SlotIDs, RM requests slots from TM with a special
        SlotID (negative
        > slot index). The semantic changes from "requesting the slot
        identified by
        > the given SlotID" to "requesting a slot with the given
        resource profile".
        > The AllocationID is used for identifying the dynamic slots
        in such cases.
        >
        > >From the perspective of FLIP-56 and fine grained resource
        management, I'm
        > fine with removing AllocationID. In the meantime, we would
        need TM to
        > recognize the special negative indexed SlotID and generate
        a new unique
        > SlotID for identifying the slot.
        >
        > *3. Minimum resource requirement*
        >
        > However, we can let the JobMaster know if we cannot fulfill
        the minimum
        >
        > resource requirement for a job after
        > resourcemanager.standalone.start-up-time has passed.
        >
        > What is the "minimum resource requirement for a job"? Did I
        overlook
        > anything?
        >
        > *4. Offer/free slots between JM/TM*
        >
        > This probably deserves a separate discussion thread. Just
        want to bring it
        > up.
        >
        > The idea has been coming to me for quite some time. Is this
        design, that JM
        > requests resources from RM while accepting/releasing
        resources from/to TM,
        > the right thing?
        >
        > The pain point is that events of JM's activities
        (requesting/releasing
        > resources) arrive at RM out of order. This leads to several
        problems.
        >
        >    - When a job fails and task cancelation takes long, some
        of the slots
        >    might be released from the slot pool due to being unused
        for a while. Then
        >    the job restarts and requests these slots again. At this
        time, RM may
        >    receive slot requests before noticing from TM heartbeats
        that previous
        >    slots are released, thus requesting new resources. I've
        seen many times
        >    that the Yarn cluster has a heavy load and is not
        allocating resources
        >    quickly enough, which leads to slot request timeout and
        job failover, and
        >    during the failover more resources are requested which
        adds more load to
        >    the Yarn cluster. Happily, this should be improved with
        the declarative
        >    resource management. :)
        >    - As described in this FLIP, it is possible that RM
        learns the releasing
        >    of slots from TM heartbeat before noticing the resource
        requirement
        >    decreasing, it may allocate more resources which need to
        be released soon.
        >    - It complicates the ResourceManager/SlotManager, by
        requiring an
        >    additional slot state PENDING, which means the slot is
        assigned by RM but
        >    is not confirmed successfully ordered by TM.
        >
        > Why not just make RM offer the allocated resources (TM
        address, SlotID,
        > etc.) to JM, and JM release resources to RM? So that for
        all the resource
        > management JM talks to RM, and for the task deployment and
        execution it
        > talks to TM?
        >
        > I tried to understand the benefits for having the current
        design, and found
        > the following in FLIP-6[2].
        >
        >
        > All that the ResourceManager does is negotiate between the
        > cluster-manager, the JobManager, and the TaskManagers. Its
        state can hence
        > be reconstructed from re-acquiring containers and
        re-registration from
        > JobManagers and TaskManagers
        >
        > Correct me if I'm wrong, it seems the original purpose is
        to make sure the
        > assignment between jobs and slots are confirmed between JM
        and TMs, so that
        > failures of RM will not lead to any inconsistency. However,
        this only
        > benefits scenarios where RM fails while JM and TMs live.
        Currently, JM and
        > RM are in the same process. We do not really have any
        scenario where RM
        > fails alone. We might separate JM and RM to different
        processes in future,
        > but as far as I can see we don't have such requirements at
        the moment. It
        > seems to me that we are suffering the current problems,
        complying to
        > potential future benefits.
        >
        > Maybe I overlooked something.
        >
        > *5. Implementation Plan*
        >
        > For SlotPool, it sounds quite straightforward to "aggregate
        individual slot
        > requests".
        >
        > For Resource/SlotManager, it seems there are quite a lot
        changes needed,
        > with the removal of individual slot requests and
        AllocationID. It's not
        > clear to me what is the first step plan for RM/SM? Do we
        internally treat
        > the resource requirements as individual slot requests as
        the first step, so
        > only the interfaces are changed? Or do we actually change
        (practically
        > re-write) the slot allocation logics?
        >
        > Thank you~
        >
        > Xintong Song
        >
        >
        >
        
[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
        >
        
[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
        >
        > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>>
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:
        >
        >
        > Hello,
        >
        > in FLIP-138 we want to rework the way the JobMaster
        acquires slots, such
        > that required resources are declared before a job is
        scheduled and th
        > job execution is adjusted according to the provided
        resources (e.g.,
        > reducing parallelism), instead of asking for a fixed number
        of resources
        > during scheduling and failing midway through if not enough
        resources are
        > available.
        >
        > This is a stepping stone towards reactive mode, where Flink
        will
        > automatically make use of new TaskExecutors being started.
        >
        > More details can be found here
        >
        
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
        >
        > .
        >
        >
        >



Reply via email to