[ https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-20038: ----------------------------------- Labels: auto-deprioritized-major auto-deprioritized-minor (was: auto-deprioritized-major stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Rectify the usage of ResultPartitionType#isPipelined() in partition tracker. > ---------------------------------------------------------------------------- > > Key: FLINK-20038 > URL: https://issues.apache.org/jira/browse/FLINK-20038 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination, Runtime / Network > Reporter: Jin Xing > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new > shuffle manner, thus to benefit different scenarios. New shuffle manner tend > to bring in new abilities which could be leveraged by scheduling layer to > provide better performance. > From my understanding, the characteristics of shuffle manner is exposed by > ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and > leveraged by scheduling layer to conduct job. But seems that Flink doesn't > provide a way to describe the new characteristics from a plugged in shuffle > manner. I also find that scheduling layer have some weak assumptions on > ResultPartitionType. I detail by below example. > In our internal Flink, we develop a new shuffle manner for batch jobs. > Characteristics can be summarized as below briefly: > 1. Upstream task shuffle writes data to DISK; > 2. Upstream task commits data while producing and notify "consumable" to > downstream BEFORE task finished; > 3. Downstream is notified when upstream data is consumable and can be > scheduled according to resource; > 4. When downstream task failover, only itself needs to be restarted because > upstream data is written into disk and replayable; > We can tell the character of this new shuffle manner as: > a. isPipelined=true – downstream task can consume data before upstream > finished; > b. hasBackPressure=false – upstream task shuffle writes data to disk and can > finish by itself no matter if there's downstream task consumes the data in > time. > But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) > seems contradicts the partition lifecycle management in current scheduling > layer: > 1. The above new shuffle manner needs partition tracker for lifecycle > management, but current Flink assumes that ALL "isPipelined=true" result > partitions are released on consumption and will not be taken care of by > partition tracker > ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66]) > – the limitation is not correct for this case. > From my understanding, the method of ResultPartitionType#isPipelined() > indicates whether data can be consumed while being produced, and it's > orthogonal to whether the partition is released on consumption. I propose to > have a fix on this and fully respect to the original meaning of > ResultPartitionType#isPipelined(). -- This message was sent by Atlassian Jira (v8.20.1#820001)