[ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227720#comment-17227720
 ] 

Jin Xing commented on FLINK-20038:
----------------------------------

Gentle ping [~zhuzh] and [~azagrebin] Flink master for shepherd, do you think 
the concern I mentioned in description is valid ?

There could be several ways to fix/improve, and I'm not sure which one is 
preferred:

1. Minor change this 
[snippet|http://https//github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66]
 in partition tracker as below

 
{code:java}
ResultPartitionType rpType = 
resultPartitionDeploymentDescriptor.getPartitionType();
if (rpType == PIPELINED || rpType == PIPELINED_BOUNDED)) {
   return;
}
{code}
2. Add an attribute of *_releaseOnConsumption_* in ResultPartitionType, 
releaseOnConsumption=true for PIPELINED and PIPELINED_BOUNDED, 
*_releaseOnConsumption=false_* for BLOCKING, then check 
*_releaseOnConsumption_* in partition tracker.

3. Currently partition tracker tracks a partition only if it's not released on 
consumption, shall we consider to remove this limitation? I don't see any 
critical issues if partition tracker tracks all kinds of result partition.

 

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20038
>                 URL: https://issues.apache.org/jira/browse/FLINK-20038
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Jin Xing
>            Priority: Major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to