[
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228962#comment-17228962
]
Yuan Mei commented on FLINK-20038:
----------------------------------
Redirect from FLINK-19693:
Hey, [[email protected]], these are great points! I have similar
feelings/considerations when I introduced a new ResultPartitionType
PIPELINED_APPROXIMATE and the corresponding `reconnectable` attribute in
FLINK-19632.
Some thoughts here:
Each shuffle mode may require subtly different
# scheduling strategy
# failover strategy
# lifecycle management
# runtime implementations
To some extent, the above four items are correlated in some way or the other.
For example, PIPELINED_APPROXIMATE is *pipelined* in the sense that downstream
tasks can start consuming data before upstream tasks finish. However, it is
*blocking* in the sense that the result partitions are re-connectable (but not
re-consumable, so strictly speaking, it is not blocking as well).
PIPELINED_APPROXIMATE's runtime implementation is a bit different from
pipelined in the sense that it has to handle partial records as handled in
FLINK-19547. It needs a dedicated failover strategy to only restart failed
tasks and the existing scheduling strategy can be reused or not (FLINK-20048).
These are what I mean by *"correlated in some way or the other"*.
Hence, I do not think those four items above can be thought of as completely
independent of each other. However today, it seems quite difficult to extend
some of the above (if not all) to link 1-2-3-4 as a whole thing, and this is
one of the most valuable things I've learned from implementing approximate
local recovery.
So, my question is:
Do we have plans to expose more interfaces to ease extension? Here are some
immature thoughts, which would probably also be useful if we want to support
channel data stored in DSTL later?
# User-defined/Configurable Result Partition Type with configurable attributes
# lifecycle management of different Result Partition Type that can be
registered to JobMaster
# User-defined scheduling strategy based on Result Partition Type
# User-defined scheduling strategy based on Result Partition Type
> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> ----------------------------------------------------------------------------
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination, Runtime / Network
> Reporter: Jin Xing
> Priority: Major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend
> to bring in new abilities which could be leveraged by scheduling layer to
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't
> provide a way to describe the new characteristics from a plugged in shuffle
> manner. I also find that scheduling layer have some weak assumptions on
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs.
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can
> finish by itself no matter if there's downstream task consumes the data in
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false)
> seems contradicts the partition lifecycle management in current scheduling
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle
> management, but current Flink assumes that ALL "isPipelined=true" result
> partitions are released on consumption and will not be taken care of by
> partition tracker
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
> – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined()
> indicates whether data can be consumed while being produced, and it's
> orthogonal to whether the partition is released on consumption. I propose to
> have a fix on this and fully respect to the original meaning of
> ResultPartitionType#isPipelined().
--
This message was sent by Atlassian Jira
(v8.3.4#803005)