I have updated the FLIP.

- consistently use "local"/"global" terminology; this incidentally should make it easier to update the terminology if we decide on other names
- inform RM via heartbeats from TE about available global partitions
- add dedicated method for releasing global partitions
- add dedicated section for required changes to the ShuffleMaster (mostly clarification)
- added some items to the "Rejected Alternatives" section
- updated discussion link


While writing the ShuffleMaster section I noticed the following:

If, at any point, the JM/RM are moved into dedicated processes we either
a) have multiple ShuffleMaster instances for the same shuffle service active
b) require a single ShuffleMaster on the RM, to which JM calls are being forwarded.

Neither of these are without pain-points;
a) introduces additional constraints on ShuffleMaster implementations in that no local state must be kept b) again forces the JM to regularly be in touch with the RM, and limits the ShuffleMaster interface to being RPC-friendly.

I'm wondering whether this issue was already an anyone's radar.


On 04/10/2019 14:12, Till Rohrmann wrote:


On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    /Till: In the FLIP you wrote "The set of partitions to release may
    contain local and/or global partitions; the promotion set must
    only refer to local partitions." to describe the
    `releasePartitions`. I think the JM should never be in the
    situation to release a global partition. Moreover, I believe we
    should have a separate RPC to release global result partitions
    which might come from the RM./

    We can certainly add a separate RPC method for explicitly releasing global 
partitions.
    You are correct that the JM should not be able to release those, just like 
the RM should not be able to release non-global partitions.

    /Till: Once the JM has obtained the required slots to run a job,
    it no longer needs to communicate with the RM. Hence, a lost RM
    connection won't interfere with the job. I would like to keep it
    like this by letting the TE announce global result partitions to
    the RM and not to introduce another communication roundtrip. /Agreed, this 
is a nice property to retain.

    /Till: How big do you expect the payload to become? /I don't know, which is 
precisely why I want to be cautious about it.
    The last time I made a similar assumption I didn't expect anyone to have 
hundreds of thousands of metrics on a single TM, which turned out to be wrong.
    I wouldn't exclude the possibility of a similar number of partitions being 
hosted on a single TE.


    One problem we have to solve with the heartbeat-based approach is that 
partitions may be lost without the TE noticing, due to disk-failures or 
external delete operations.
    Currently, for scheduling purposes we rely on information stored in the JM, 
and update said information if a job fails due to a missing partition. However, 
IIRC the JM is informed about with an exception that is thrown by the consumer 
of said partition, not the producer. As far as the producing TM is concerned, 
it is still hosting that partition.
    This means we have to forward errors for missing partitions from the 
network stack on the producers side to the TE, so that it can inform the RM 
about it.


Yes, I think you are right Chesnay. This would also be a good addition for the local result partitions.

Cheers,
Till

    On 02/10/2019 16:21, Till Rohrmann wrote:
    Thanks for addressing our comments Chesnay. See some comments inline.

    On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler<ches...@apache.org>  
<mailto:ches...@apache.org>  wrote:

    Thank you for your comments; I've aggregated them a bit and added
    comments to each of them.

    1) Concept name (proposal: persistent)

    I agree that "global" is rather undescriptive, particularly so since we
    never had a notion of "local" partitions.
    I'm not a fan of "persistent"; as to me this always implies reliable
    long-term storage which as I understand we aren't shooting for here.

    I was thinking of "cached" partitions.

    To Zhijiangs point, we should of course make the naming consistent
    everywhere.

    2) Naming of last parameter of TE#releasePartitions (proposal:
    partitionsToRetain / partitionsToPersistent)

    I can see where you're coming from ("promote" is somewhat abstract), but
    I think both suggestions have downsides.

    "partitionsToPersistent" to me implies an additional write operation to
    somewhere, but we aren't doing that.
    "partitionsToRetain" kind of results in a redundancy with the other
    argument since retaining is the opposite to releasing a partition; if I
    want to retain a partition, why am I not just excluding it from the set
    to release?

    I quite like "promote" personally; we fundamentally change how the
    lifecycle for these partitions work, and introducing new keywords isn't
    a inherently a bad choice.

    3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
    Note: addition of "OrPromote" is dependent on 2) )

    Good point.

    4) /Till: I'm not sure whether partitionsToRelease should contain a//
    //global/persistent result partition id. I always thought that the user
    will//
    //be responsible for managing the lifecycle of a global/persistent//
    //result partition./

    @Till Please elaborate; which method/argument are you referring to?

    In the FLIP you wrote "The set of partitions to release may contain local
    and/or global partitions; the promotion set must only refer to local
    partitions." to describe the `releasePartitions`. I think the JM should
    never be in the situation to release a global partition. Moreover, I
    believe we should have a separate RPC to release global result partitions
    which might come from the RM.

    4)/Dedicated PartitionTable for global partitions/

    Since there is only one RM for each TE a PartitionTable is unnecessary;
    a simple set will suffice.
    Alternatively, we could introduce such a dedicated set into the
    PartitionTable to keep these data-structures close.

    5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
    retain global partitions for successful jobs"/

    Will fix it.

    6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and
    expected to interactive with JM before. Now the RM also needs to
    interactive with ShuffleMaster to release global partitions. Then it
    might be better to move ShuffleMaster outside of JM, and the lifecycle
    of ShuffleMaster should be consistent with RM./

    Yes, I alluded to this in the FLIP but should've been more explicit; the
    shuffle master must outlive the JM. This is somewhat tricky when
    considering the future a bit; if we assume that different jobs or even a
    single one can use different shuffle services, then we need a way to
    associate the partitions with the corresponding shuffle master. This
    will likely require the introduction of a ShuffleMasterID that is
    included in the ShuffleDescriptor.

    7) Handover

    /Till: The handover logic between the JM and the RM for the
    global/persistent//
    //result partitions seems a bit brittle to me. What will happen if the JM//
    //cannot reach the RM? I think it would be better if the TM announces the//
    //global/persistent result partitions to the RM via its heartbeats. That
    way//
    //we don't rely on an established connection between the JM and RM and we//
    //keep the TM as the ground of truth. Moreover, the RM should simply
    forward//
    //the release calls to the TM without much internal logic./

    As for your question, if the JM cannot reach the RM the handover will
    fail, the JM will likely shutdown without promoting any partition and
    the TE will release all partitions.
    What is the defined behavior for the JM in case of the RM disconnect
    after a job has finished? Does it always/sometimes/never shutdown
    with/-out communicating the result to the client / updating HA data;
    or simply put, does the JM behave to the user as if nothing has happened
    in all cases?

    Once the JM has obtained the required slots to run a job, it no longer
    needs to communicate with the RM. Hence, a lost RM connection won't
    interfere with the job. I would like to keep it like this by letting the TE
    announce global result partitions to the RM and not to introduce another
    communication roundtrip.

    A heartbeat-based approach is useful and can alleviate some failure
    cases (see below); but we need to make sure we don't exceed the akka
    framesize or otherwise interfere with the heartbeat mechanism (like we
    did with metrics in the past). Ideally we would only submit updates to
    the partition set (added/removed partitions), but I'm not sure if the
    heartbeats are reliable enough for this to work.

    How big do you expect the payload to become?

    8. Failure cases:
    /Becket:/
    /a) The TEs may remove the result partition while the RM does not//
    //know. In this case, the client will receive a runtime error and submit
    the//
    //full DAG to recompute the missing result partition. In this case, RM
    should//
    //release the incomplete global partition. How would RM be notified to do//
    //that?//
    //b) Is it possible the RM looses global partition metadata while//
    //the TE still host the data? For example, RM deletes the global
    partition//
    //entry while the release partition call to TE failed.//
    //c) What would happen if the JM fails before the global partitions//
    //are registered to RM? Are users exposed to resource leak if JM does not//
    //have HA?//
    //d) What would happen if the RM fails? Will TE release the//
    //partitions by themselves?/

    1.a) This is a good question that I haven't considered. This will likely
    require a heartbeat-like report of available partitions.

    The hearbeat based synchronization approach seems to crystalize as the way
    to go forward with this FLIP.


    1.b) RM should only delete entries if it received an ack from the TE;
    otherwise we could easily end up leaking partitions. I believe I forgot
    writing this down.
    1.c) As described in the FLIP the handoff to the RM must occur before
    partitions are promoted.
          If the JM fails during the handoff then the TE will cleanup all
    partitions since it lost the connection to the JM, and partitions
    weren't promoted yet.
          If the JM fails after the handoff but before the promotion, same as
    above. The RM would contain invalid entries in this case; see 1.a) .
          If the JM fails after the handoff and promotion partitions we don't
    leak anything since the RM is now fully responsible.
    1.d) yes; if the connection to the RM is disrupted the TE will cleanup
    all global partitions, similar to how it cleans up all partitions
    associated with a given job if the connection to the corresponding JM is
    disrupted.

    9. /Becket: It looks that TE should be the source of truth of the result
    partition//
    //existence. Does it have to distinguish between global and local result//
    //partitions? If TE does not need to distinguish them, it seems the the//
    //releasePartition() method in TE could just provide the list of
    partitions//
    //to release, without the partitions to promote./

    The promotion is a hard requirement, as this is the signal to the TE
    that this partition is no longer bound to the life-cycle of a job.
    Without the promotion the TE would delete the partition once the JM has
    shutdown; this is a safety net to ensure cleanup of partitions in case
    of a disconnect.

    10. /In the current design, RM should be able to release result//
    //partitions using ShuffleService. Will RM do this by sending RPC to the
    TEs?//
    //Or will the RM do it by itself?/

    The RM will send a release call to each TM and issue a release call to
    the ShuffleMaster, just like the JobMaster handles partition releases.

    11. /Becket: How do we plan to handle the case when there are different
    shuffle//
    //services in the same Flink cluster? For example, a shared standalone//
    //cluster./

    This case is not considered; there are so many changes necessary in
    other parts of the runtime that we would jump the gun in addressing it
    here.
    Ultimately though, I would think that that the addition of a shuffle
    master instance ID and shuffle service identifier should suffice.

    The identifier is used in subsequent jobs to load the appropriate
    shuffle service for a given partition (think of it like a class name),
    while the shuffle master instance ID is used to differentiate between
    the different shuffle master instances running in the cluster (which
    partitions have to be associated with so we can issue the correct
    release calls).

    12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is
    there a//
    //reason we use `:` instead?/

    That's netty syntax for path parameters.

    On 30/09/2019 08:34, Becket Qin wrote:
    Forgot to say that I agree with Till that it seems a good idea to let TEs
    register the global partitions to the RM instead of letting JM do it.
    This
    simplifies quite a few things.

    Thanks,

    Jiangjie (Becket) Qin

    On Sun, Sep 29, 2019 at 11:25 PM Becket Qin<becket....@gmail.com>  
<mailto:becket....@gmail.com>
    wrote:
    Hi Chesnay,

    Thanks for the proposal. My understanding of the entire workflow step by
    step is following:

         - JM maintains the local and global partition metadata when the task
    runs to create result partitions. The tasks themselves does not
    distinguish
    between local / global partitions. Only the JM knows that.
         - JM releases the local partitions as the job executes. When a job
    finishes successfully, JM registers the global partitions to the RM. The
    global partition IDs are set on the client instead of randomly
    generated,
    so the client can release global partitions using them. (It would be
    good
    to have some human readable string associated with the global result
    partitions).
         - Client issues REST call to list / release global partitions.

    A few thoughts / questions below:
    1. Failure cases:
                * The TEs may remove the result partition while the RM does
    not
    know. In this case, the client will receive a runtime error and submit
    the
    full DAG to recompute the missing result partition. In this case, RM
    should
    release the incomplete global partition. How would RM be notified to do
    that?
                * Is it possible the RM looses global partition metadata
    while
    the TE still host the data? For example, RM deletes the global partition
    entry while the release partition call to TE failed.
                * What would happen if the JM fails before the global
    partitions
    are registered to RM? Are users exposed to resource leak if JM does not
    have HA?
                * What would happen if the RM fails? Will TE release the
    partitions by themselves?

    2. It looks that TE should be the source of truth of the result
    partition
    existence. Does it have to distinguish between global and local result
    partitions? If TE does not need to distinguish them, it seems the the
    releasePartition() method in TE could just provide the list of
    partitions
    to release, without the partitions to promote.

    3. In the current design, RM should be able to release result
    partitions using ShuffleService. Will RM do this by sending RPC to the
    TEs?
    Or will the RM do it by itself?

    4. How do we plan to handle the case when there are different shuffle
    services in the same Flink cluster? For example, a shared standalone
    cluster.

    5. Minor: usually REST API uses `?` to pass the parameters. Is there a
    reason we use `:` instead?

    Thanks,

    Jiangjie (Becket) Qin

    On Tue, Sep 17, 2019 at 3:22 AM zhijiang
    <wangzhijiang...@aliyun.com.invalid>  
<mailto:wangzhijiang...@aliyun.com.invalid>  wrote:

    Thanks Chesnay for this FLIP and sorry for touching it a bit delay on
    my
    side.

    I also have some similar concerns which Till already proposed before.

    1. The consistent terminology in different components. On JM side,
    PartitionTracker#getPersistedBlockingPartitions is defined for getting
    global partitions. And on RM side, we define the method of
    #registerGlobalPartitions correspondingly for handover the partitions
    from
    JM. I think it is better to unify the term in different components for
    for
    better understanding the semantic. Concering whether to use global or
    persistent, I prefer the "global" term personally. Because it
    describes the
    scope of partition clearly, and the "persistent" is more like the
    partition
    storing way or implementation detail. In other words, the global
    partition
    might also be cached in memory of TE, not must persist into files from
    semantic requirements. Whether memory or persistent file is just the
    implementation choice.

    2. On TE side, we might rename the method #releasePartitions to
    #releaseOrPromotePartitions which describes the function precisely and
    keeps consistent with
    PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().

    3. Very agree with Till's suggestions of global PartitionTable on TE
    side
    and sticking to TE's heartbeat report to RM for global partitions.

    4. Considering ShuffleMaster, it was built inside JM and expected to
    interactive with JM before. Now the RM also needs to interactive with
    ShuffleMaster to release global partitions. Then it might be better to
    move
    ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should
    be
    consistent with RM.

    5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
    partitions for successful jobs"

    Best,
    Zhijiang


    ------------------------------------------------------------------
    From:Till Rohrmann<trohrm...@apache.org>  <mailto:trohrm...@apache.org>
    Send Time:2019年9月10日(星期二) 10:10
    To:dev<dev@flink.apache.org>  <mailto:dev@flink.apache.org>
    Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle

    Thanks Chesnay for drafting the FLIP and starting this discussion.

    I have a couple of comments:

    * I know that I've also coined the terms global/local result partition
    but
    maybe it is not the perfect name. Maybe we could rethink the
    terminology
    and call them persistent result partitions?
    * Nit: I would call the last parameter of void releasePartitions(JobID
    jobId, Collection<ResultPartitionID> partitionsToRelease,
    Collection<ResultPartitionID> partitionsToPromote) either
    partitionsToRetain or partitionsToPersistent.
    * I'm not sure whether partitionsToRelease should contain a
    global/persistent result partition id. I always thought that the user
    will
    be responsible for managing the lifecycle of a global/persistent
    result partition.
    * Instead of extending the PartitionTable to be able to store
    global/persistent and local/transient result partitions, I would rather
    introduce a global PartitionTable to store the global/persistent result
    partitions explicitly. I think there is a benefit in making things as
    explicit as possible.
    * The handover logic between the JM and the RM for the
    global/persistent
    result partitions seems a bit brittle to me. What will happen if the JM
    cannot reach the RM? I think it would be better if the TM announces the
    global/persistent result partitions to the RM via its heartbeats. That
    way
    we don't rely on an established connection between the JM and RM and we
    keep the TM as the ground of truth. Moreover, the RM should simply
    forward
    the release calls to the TM without much internal logic.

    Cheers,
    Till

    On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler<ches...@apache.org>  
<mailto:ches...@apache.org>
    wrote:

    Hello,

    FLIP-36 (interactive programming)
    <

    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
    proposes a new programming paradigm where jobs are built incrementally
    by the user.

    To support this in an efficient manner I propose to extend partition
    life-cycle to support the notion of /global partitions/, which are
    partitions that can exist beyond the life-time of a job.

    These partitions could then be re-used by subsequent jobs in a fairly
    efficient manner, as they don't have to persisted to an external
    storage
    first and consuming tasks could be scheduled to exploit data-locality.

    The FLIP outlines the required changes on the JobMaster, TaskExecutor
    and ResourceManager to support this from a life-cycle perspective.

    This FLIP does /not/ concern itself with the /usage/ of global
    partitions, including client-side APIs, job-submission, scheduling and
    reading said partitions; these are all follow-ups that will either be
    part of FLIP-36 or spliced out into separate FLIPs.




Reply via email to