[ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17228731#comment-17228731
 ] 

Till Rohrmann commented on FLINK-20038:
---------------------------------------

Thanks for starting this discussion [[email protected]]. I think you are 
right that Flink does not allow to easily add new result partition types with a 
different set of properties. What you are describing would be something like a 
{{PIPELINED_PERSISTED}} or {{PIPELINED_REPLAYABLE}} result partition with 
"pipelined" meaning that it can be consumed while it is processed. I think the 
term "pipelined" has evolved a bit and some people understand it as producers 
and consumers have to run at the same time.

Looking at the {{ResultPartition}} properties, it looks as if some of them are 
actually quite shopworn and not really used anymore (e.g. {{hasBackPressure}}).

I think it is not only about tracking the result partitions but also to enable 
the scheduler to schedule consumers once the result partitions are consumable 
(or at least once they are deployed). At the moment, the scheduler 
differentiates between blocking and pipelined partitions. For the former, the 
producer needs to complete before the consumer is scheduled. For the latter, 
the consumer will be scheduled at the same time because they are part of the 
same pipelined region.

Before doing any changes in this area, I think we would need a proper design 
for what else we want to support and how it could be mapped to properties. 
Moreover, we need to properly define the behaviour of the scheduling layer and 
result partition lifecycle management.

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20038
>                 URL: https://issues.apache.org/jira/browse/FLINK-20038
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination, Runtime / Network
>            Reporter: Jin Xing
>            Priority: Major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to