[
https://issues.apache.org/jira/browse/FLINK-10653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhijiang updated FLINK-10653:
-----------------------------
Description:
The network shuffle behavior for batch jobs in Flink can be improved in two
dimensions:
*1. ShuffleService:* Shuffle service is used for transporting upstream’s
outputs to the downstream side. The {{TaskExecutor}} takes the role of shuffle
service unified for both stream and batch jobs currently.
* The output is consumable only when the upstream task finishes for blocking
mode. The {{TaskExecutor}} can not exit to release resources even though there
are no tasks running, because it should responsible for transferring data via
shuffle service. This has the bad effects in resource dynamic or sensitive
scenarios.
2. *ResultPartition:* there are two types of subpartitions currently for the
outputs. The {{PipelinedSubPartition}} is used for stream jobs and the
{{SpillableSubpartition}} is used for batch jobs in blocking mode.
* The {{SpillableSubpartition}} generates one separate persistent file for
every subpartition. This partition-hash mode is not IO friendly for special
scenarios, i.e. large number of subpartitions with little data in every
subpartition. And it may exceed the inode limits for large scale jobs.
* The persistent files would be deleted immediately after reading to transport
layer. If the downstream task fails during consumption, all the related
upstream tasks have to be restarted to reproduce the outputs.
We propose three overall changes:
* Mechanism and setting of external shuffle service
* Output sort&merge persistent files for new {{ResultPartitionType}}
* Delete persistent output files via notification or ttl
The detail design doc would be submitted later.
was:
The network shuffle behavior for batch jobs in Flink can be improved in two
dimensions:
* *ShuffleService:* Shuffle service is used for transporting upstream’s
outputs to the downstream side. The {{TaskExecutor}} takes the role of shuffle
service unified for both stream and batch jobs currently.
* The output is consumable only when the upstream task finishes for blocking
mode. The {{TaskExecutor}} can not exit to release resources even though there
are no tasks running, because it should responsible for transferring data via
shuffle service. This has the bad effects in resource dynamic or sensitive
scenarios.
* *ResultPartition:* there are two types of subpartitions currently for the
outputs. The {{PipelinedSubPartition}} is used for stream jobs and the
{{SpillableSubpartition}} is used for batch jobs in blocking mode.
* The {{SpillableSubpartition}} generates one separate persistent file for
every subpartition. This partition-hash mode is not IO friendly for special
scenarios, i.e. large number of subpartitions with little data in every
subpartition. And it may exceed the inode limits for large scale jobs.
* The persistent files would be deleted immediately after reading to transport
layer. If the downstream task fails during consumption, all the related
upstream tasks have to be restarted to reproduce the outputs.
We propose three overall changes:
* Mechanism and setting of external shuffle service
* Output sort&merge persistent files for new {{ResultPartitionType}}
* Delete persistent output files via notification or ttl
The detail design doc would be submitted later.
> Propose of external shuffle service for batch jobs
> --------------------------------------------------
>
> Key: FLINK-10653
> URL: https://issues.apache.org/jira/browse/FLINK-10653
> Project: Flink
> Issue Type: New Feature
> Components: Network
> Affects Versions: 2.0.0
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Major
>
> The network shuffle behavior for batch jobs in Flink can be improved in two
> dimensions:
>
> *1. ShuffleService:* Shuffle service is used for transporting upstream’s
> outputs to the downstream side. The {{TaskExecutor}} takes the role of
> shuffle service unified for both stream and batch jobs currently.
> * The output is consumable only when the upstream task finishes for blocking
> mode. The {{TaskExecutor}} can not exit to release resources even though
> there are no tasks running, because it should responsible for transferring
> data via shuffle service. This has the bad effects in resource dynamic or
> sensitive scenarios.
>
> 2. *ResultPartition:* there are two types of subpartitions currently for the
> outputs. The {{PipelinedSubPartition}} is used for stream jobs and the
> {{SpillableSubpartition}} is used for batch jobs in blocking mode.
> * The {{SpillableSubpartition}} generates one separate persistent file for
> every subpartition. This partition-hash mode is not IO friendly for special
> scenarios, i.e. large number of subpartitions with little data in every
> subpartition. And it may exceed the inode limits for large scale jobs.
> * The persistent files would be deleted immediately after reading to
> transport layer. If the downstream task fails during consumption, all the
> related upstream tasks have to be restarted to reproduce the outputs.
>
> We propose three overall changes:
> * Mechanism and setting of external shuffle service
> * Output sort&merge persistent files for new {{ResultPartitionType}}
> * Delete persistent output files via notification or ttl
>
> The detail design doc would be submitted later.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)