Thesharing opened a new pull request #16314:
URL: https://github.com/apache/flink/pull/16314
## What is the purpose of the change
*This pull request proposes to optimize the performance of task deployment.
The main idea is to introduce a cache for the TaskDeploymentDescriptor. We
cache the compressed serialized value of ShuffleDescriptors. If the size of the
value exceeds a certain threshold, the value would be distributed via the blob
server. For more details please check FLINK-23005.*
## Brief change log
- *Pass ConsumedPartitionGroup to TaskDeploymentDescriptorFactory as the
key of cache.*
- *Introduce CompressedSerializedValue. Now SerializedValue and
CompressedSerializedValue both inherit from SerializedValueBase.*
- *Refactor BlobWriter#serializeAndTryOffload. Now it returns
SerializedValueBase instead of SerializedValue.*
- *Cache compressed serialized value of ShuffleDescriptors and distribute
them via blob server.*
## Verifying this change
This change added tests and can be verified as follows:
- *Added unit tests for CompressedSerializedValue and BlobWriter.*
- *Added unit tests for the cache of ShuffleDescriptors in
TaskDeploymentDescriptorFactorTest.*
- *Extended DefaultExecutionGraphDeploymentTest to test whether
ShuffleDescriptors is cached correctly or not.*
- *Manually verified the change by running an end-to-end test. The job has
two JobVertices. One is source, while the other is sink. The parallelism is 8k
and 16k. They are connected with an all-to-all edge. Both streaming mode and
batch mode are tested. The performance of deployment has improved
significantly.*
*The results illustrated below are the time interval between the timestamp
of the first task that transitions to deploying and the timestamp of the last
task that transitions to running:*
Parallelism | Before | After
-- | -- | --
8000*8000 | 32.611s | 6.480s
16000*16000 | 128.408s | 19.051s
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no**
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't
know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs /
JavaDocs / not documented)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]