Weihua Hu created FLINK-32045:
---------------------------------

             Summary: optimize task deployment performance for large-scale jobs
                 Key: FLINK-32045
                 URL: https://issues.apache.org/jira/browse/FLINK-32045
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
            Reporter: Weihua Hu


h1. Background

In FLINK-21110, we cache shuffle descriptors on the job manager side and 
support using blob servers to offload these descriptors in order to reduce the 
cost of tasks deployment.

I think there is also some improvement we could do for large-scale jobs.
 # The default min size to enable distribution via blob server is 1MB. But for 
a large wordcount job with 20000 parallelism, the size of serialized shuffle 
descriptors is only 300KB. It means users need to lower the 
"blob.offload.minsize", but the value is hard for users to decide.
 # The task executor side still needs to load blob files and deserialize 
shuffle descriptors for each task. Since these operations are running in the 
main thread, it may be pending other RPCs from the job manager.

h1. Propose
 # Enable distribute shuffle descriptors via blob server automatically. This 
could be decided by the edge number of the current shuffle descriptor. The blob 
offload will be enabled when the edge number exceeds an internal threshold.
 # Introduce cache of deserialized shuffle descriptors on the task executor 
side. This could reduce the cost of reading from local blob files and 
deserialization. Of course, the cache should have TTL to avoid occupying too 
much memory. And the cache should have the same switch mechanism as the blob 
server offload.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to