[ 
https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17721774#comment-17721774
 ] 

Weihua Hu commented on FLINK-32045:
-----------------------------------

Thanks [~Thesharing]  for your reply, your comment are very meaningful and 
valuable. Let me try to answer one by one.
h3. Distribution of shuffle descriptors via blob server.
IMO, there are two things should considered to whether enable distribution of 
shuffle descriptors via blob server.
 # 
The size of shuffle descriptors. This is related to the parallelism of 
producers for a single consumer.
 # 
How many times should this shuffle descriptors transport to TaskManager. This 
is related to the parallelism of consumers for this producer.

  So, I think it is better to use the number of edges in ConsumedPartitionGroup 
to decide whether to enable blob server offload.
  And I'd like to make this logic internally (give a proper default value, for 
example 1000*1000, should be decided after some benchmark) since it really 
needs advanced knowledge for users to figure it out how to set it.
h3. how much performance it would improve with a cache for shuffle descriptors 
in the TaskManager.
 I have tests in this environment. Yarn cluster with 2000 TaskManager. Each 
TaskManager has 6 core and 16GB memory and set "taskmanager.numberOfTaskSlots" 
to 10. Submit a simple WordCount with 20000 parallelism. * Without blob server 
offload, the job failed with submitTask RPC timeout. All CPU of JobManager used 
to serialized RPC:submitTask.
 * With blob server offload but no TaskExecutor cache, deploy all tasks take 25s
 * With blob server offload and TaskExecutor cache, deploy all tasks take 15s

h3. How to update the cache?
As you mentioned, it's too complicated to keep cache in JobManager and 
TaskExecutor consistent. So, we will add some constraints to the Cache
 # 
Cache will be enabled when necessary (same conditions with distribution of 
shuffle descriptors via blob server). In most cases serialized shuffle 
descriptors are small and transport in akka message, the cost of 
deserialization is very small, they do not need to be cached.
 # 
Cache of job will be cleared when task executor disconnects with job master.
 # 
Cache with TTL. We should configure a proper default ttl value, for example 3 
mins (some batch job may deploy lazily)
 # 
The max size of cache. As you mentioned, LRUCache or FIFOCache is reasonable. 
Since the slots of a Task Manager won't be too large, the cache size won't be 
too large either.

For session mode(more exactly OLAP). IMO, Most of the scenarios are a lot of 
small queries. As mentioned above, they won't use cache in most cases. And the 
cache will be removed when the job is finished( task executor disconnects with 
job master), so the cache won't occupy too much memory in a short time.
 
 
Thanks again. Also thanks to [~zhuzh] ,[~wanglijie] , [~Weijie Guo] for the 
previous offline discussions.
Glad to hear any suggestions.

> optimize task deployment performance for large-scale jobs
> ---------------------------------------------------------
>
>                 Key: FLINK-32045
>                 URL: https://issues.apache.org/jira/browse/FLINK-32045
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Weihua Hu
>            Priority: Major
>
> h1. Background
> In FLINK-21110, we cache shuffle descriptors on the job manager side and 
> support using blob servers to offload these descriptors in order to reduce 
> the cost of tasks deployment.
> I think there is also some improvement we could do for large-scale jobs.
>  # The default min size to enable distribution via blob server is 1MB. But 
> for a large wordcount job with 20000 parallelism, the size of serialized 
> shuffle descriptors is only 300KB. It means users need to lower the 
> "blob.offload.minsize", but the value is hard for users to decide.
>  # The task executor side still needs to load blob files and deserialize 
> shuffle descriptors for each task. Since these operations are running in the 
> main thread, it may be pending other RPCs from the job manager.
> h1. Propose
>  # Enable distribute shuffle descriptors via blob server automatically. This 
> could be decided by the edge number of the current shuffle descriptor. The 
> blob offload will be enabled when the edge number exceeds an internal 
> threshold.
>  # Introduce cache of deserialized shuffle descriptors on the task executor 
> side. This could reduce the cost of reading from local blob files and 
> deserialization. Of course, the cache should have TTL to avoid occupying too 
> much memory. And the cache should have the same switch mechanism as the blob 
> server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to