[ 
https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-10653:
-----------------------------
    Description: 
This is the umbrella issue for improving shuffle for batch jobs.

The shuffle behavior in flink can be improved in two dimensions:

*1. ResultPartition:* There are two types of subpartitions for the sender’s 
outputs. The PipelinedSubPartition is used for stream jobs and the 
SpillableSubpartition is used for batch jobs in blocking mode.
 * *Single file mode*: The spillable subpartition generates one separate 
persistent file for each subpartition. This partition-hash mode is not IO 
friendly in some scenarios, i.e. large number of subpartitions with little data 
in each subpartition. Even it may exceed the inode limits for large scale jobs.
 * *Failover case*: The persistent files would be deleted immediately after 
reading to transport layer. If the downstream task fails during consumption, 
all the related upstream tasks have to be restarted to reproduce the outputs.

*2. ShuffleService:* Shuffle service is used for transporting sender’s data to 
the receiver side. In flink, TaskExecutor takes the role of shuffle service 
unified for both stream and batch jobs.
 * *Resource utility*: The output is consumable only when the sender’s task 
finishes for blocking mode. TaskExecutor can not exit to release resources even 
though there are no running tasks, because it has to serve for transferring 
data via shuffle service. This behavior has the bad effects in resource dynamic 
or sensitive scenarios.
 * *IO performance*: Many TaskExecutors deployed on one node may touch disk IO 
frequently for shuffle service at the same time. Loss of global control may 
decrease IO performance in sensitive scenarios.

We propose three overall changes:
 * Define the pluggable ShuffleManager interface 
 * Mechanism of external shuffle service
 * Clear persistent files via notification or TTL

 

Refer to the design doc for more details.

  was:
This is the umbrella issue for improving batch shuffle.

 

The network shuffle behavior for batch jobs in Flink can be improved in two 
dimensions:

 

*1. ShuffleService:* Shuffle service is used for transporting upstream’s 
outputs to the downstream side. The {{TaskExecutor}} takes the role of shuffle 
service unified for both stream and batch jobs currently.
 * The output is consumable only when the upstream task finishes for blocking 
mode. The {{TaskExecutor}} can not exit to release resources even though there 
are no tasks running, because it should responsible for transferring data via 
shuffle service. This has the bad effects in resource dynamic or sensitive 
scenarios.

 

2.  *ResultPartition:* there are two types of subpartitions currently for the 
outputs. The {{PipelinedSubPartition}} is used for stream jobs and the 
{{SpillableSubpartition}} is used for batch jobs in blocking mode.
 * The {{SpillableSubpartition}} generates one separate persistent file for 
every subpartition. This partition-hash mode is not IO friendly for special 
scenarios, i.e. large number of subpartitions with little data in every 
subpartition. And it may exceed the inode limits for large scale jobs.
 * The persistent files would be deleted immediately after reading to transport 
layer. If the downstream task fails during consumption, all the related 
upstream tasks have to be restarted to reproduce the outputs.

 

We propose three overall changes:
 * Mechanism and setting of external shuffle service
 * Output sort&merge persistent files for new {{ResultPartitionType}}
 * Delete persistent output files via notification or ttl

 

The detail design doc would be submitted later.


> Fine-grained Shuffle System
> ---------------------------
>
>                 Key: FLINK-10653
>                 URL: https://issues.apache.org/jira/browse/FLINK-10653
>             Project: Flink
>          Issue Type: New Feature
>          Components: Network
>    Affects Versions: 2.0.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Major
>
> This is the umbrella issue for improving shuffle for batch jobs.
> The shuffle behavior in flink can be improved in two dimensions:
> *1. ResultPartition:* There are two types of subpartitions for the sender’s 
> outputs. The PipelinedSubPartition is used for stream jobs and the 
> SpillableSubpartition is used for batch jobs in blocking mode.
>  * *Single file mode*: The spillable subpartition generates one separate 
> persistent file for each subpartition. This partition-hash mode is not IO 
> friendly in some scenarios, i.e. large number of subpartitions with little 
> data in each subpartition. Even it may exceed the inode limits for large 
> scale jobs.
>  * *Failover case*: The persistent files would be deleted immediately after 
> reading to transport layer. If the downstream task fails during consumption, 
> all the related upstream tasks have to be restarted to reproduce the outputs.
> *2. ShuffleService:* Shuffle service is used for transporting sender’s data 
> to the receiver side. In flink, TaskExecutor takes the role of shuffle 
> service unified for both stream and batch jobs.
>  * *Resource utility*: The output is consumable only when the sender’s task 
> finishes for blocking mode. TaskExecutor can not exit to release resources 
> even though there are no running tasks, because it has to serve for 
> transferring data via shuffle service. This behavior has the bad effects in 
> resource dynamic or sensitive scenarios.
>  * *IO performance*: Many TaskExecutors deployed on one node may touch disk 
> IO frequently for shuffle service at the same time. Loss of global control 
> may decrease IO performance in sensitive scenarios.
> We propose three overall changes:
>  * Define the pluggable ShuffleManager interface 
>  * Mechanism of external shuffle service
>  * Clear persistent files via notification or TTL
>  
> Refer to the design doc for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to