[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17721632#comment-17721632
 ] 

Zhilong Hong edited comment on FLINK-32045 at 5/11/23 5:18 AM:
---------------------------------------------------------------

Thank you for proposing these optimizations, Weihua!

??1. Add a configuration to enable the distribution of shuffle descriptors via 
the blob server according to the parallelism.??

For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?

??2. Introduce a cache for shuffle descriptors in the TaskManager??

We thought about introducing a cache for shuffle descriptors in the TaskManager 
earlier. Since users usually won't set a large number for the configuration 
"taskmanager.numberOfTaskSlots", which means there would only be a few slots in 
a TaskManager (for example, 8?). There won't be a lot of deserialization work 
on the TaskManager side. So, I'm wondering how much performance it would 
improve with a cache for shuffle descriptors in the TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed anything or 
anything I said is wrong. Thank you.


was (Author: thesharing):
Thank you for proposing these optimizations, Weihua!
 # For the threshold to enable the distribution of shuffle descriptors via the 
blob server, originally I'm thinking about adding a new configuration called 
something like "blob.deployement.offload.minsize" (I forgot the original name). 
This configuration was eventually dropped, because we don't want to introduce a 
new configuration that would require users to have advanced knowledge before 
configuring it.

However, I think enabling the distribution of shuffle descriptors via the blob 
server according to the parallelism is a better solution for this situation. 
It's more understandable and easier to configure. We can also set a large 
default value for this configuration. What do you think [~zhuzh]?


 # We thought about introducing a cache for shuffle descriptors in the 
TaskManager earlier. Since users usually won't set a large number for the 
configuration "taskmanager.numberOfTaskSlots", which means there would only be 
a few slots in a TaskManager (for example, 8?). There won't be a lot of 
deserialization work on the TaskManager side. So, I'm wondering how much 
performance it would improve with a cache for shuffle descriptors in the 
TaskManager.

Also, there's another question arises for the cache. How to update the cache?  
Currently, the cache in JobManager is cleared in two scenarios: (1) 
ConsumerPartitionGroup is released (2) The producer of an IntermediateResult 
encounters a failover. To clear the caches in the TaskManager at the same time, 
we may need to introduce a few complicated RPC calls between JobManager and 
TaskManager to achieve it. In my opinion, it's a bit of complicated.

The third concern is about the session mode. If users submitted a lot of jobs 
to a session in a rapid speed, the cache would flush the heap memory in a short 
time, and causes unexpected influence for user's tasks. We can use a LRUCache 
or FIFOCache for this situation. However, it's not easy for us to decide the 
size of the cache, because we don't know how large the TaskManager would be.

In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager 
may require more discussions. Please correct me if I missed anything or 
anything I said is wrong. Thank you.

> optimize task deployment performance for large-scale jobs
> ---------------------------------------------------------
>
>                 Key: FLINK-32045
>                 URL: https://issues.apache.org/jira/browse/FLINK-32045
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Weihua Hu
>            Priority: Major
>
> h1. Background
> In FLINK-21110, we cache shuffle descriptors on the job manager side and 
> support using blob servers to offload these descriptors in order to reduce 
> the cost of tasks deployment.
> I think there is also some improvement we could do for large-scale jobs.
>  # The default min size to enable distribution via blob server is 1MB. But 
> for a large wordcount job with 20000 parallelism, the size of serialized 
> shuffle descriptors is only 300KB. It means users need to lower the 
> "blob.offload.minsize", but the value is hard for users to decide.
>  # The task executor side still needs to load blob files and deserialize 
> shuffle descriptors for each task. Since these operations are running in the 
> main thread, it may be pending other RPCs from the job manager.
> h1. Propose
>  # Enable distribute shuffle descriptors via blob server automatically. This 
> could be decided by the edge number of the current shuffle descriptor. The 
> blob offload will be enabled when the edge number exceeds an internal 
> threshold.
>  # Introduce cache of deserialized shuffle descriptors on the task executor 
> side. This could reduce the cost of reading from local blob files and 
> deserialization. Of course, the cache should have TTL to avoid occupying too 
> much memory. And the cache should have the same switch mechanism as the blob 
> server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to