[
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20038:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> ----------------------------------------------------------------------------
>
> Key: FLINK-20038
> URL: https://issues.apache.org/jira/browse/FLINK-20038
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Coordination, Runtime / Network
> Reporter: Jin Xing
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend
> to bring in new abilities which could be leveraged by scheduling layer to
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't
> provide a way to describe the new characteristics from a plugged in shuffle
> manner. I also find that scheduling layer have some weak assumptions on
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs.
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can
> finish by itself no matter if there's downstream task consumes the data in
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false)
> seems contradicts the partition lifecycle management in current scheduling
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle
> management, but current Flink assumes that ALL "isPipelined=true" result
> partitions are released on consumption and will not be taken care of by
> partition tracker
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
> – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined()
> indicates whether data can be consumed while being produced, and it's
> orthogonal to whether the partition is released on consumption. I propose to
> have a fix on this and fully respect to the original meaning of
> ResultPartitionType#isPipelined().
--
This message was sent by Atlassian Jira
(v8.3.4#803005)