This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 1c39825f8 [CELEBORN-1725] Optimize performance of handling `MapperEnd`
RPC in `LifecycleManager`
1c39825f8 is described below
commit 1c39825f826916732688af64a2bb1d778fd5607c
Author: Fu Chen <[email protected]>
AuthorDate: Wed Nov 27 10:43:36 2024 -0800
[CELEBORN-1725] Optimize performance of handling `MapperEnd` RPC in
`LifecycleManager`
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Per
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#traversal-and-zipwithindex,
this PR replaces the `exists` with `while` loops to optimize
performance-sensitive code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Conducted a local test of a Spark job where the shuffle write stage `Stage
0` included 210,000 tasks. The mean processing time for `MapperEnd` RPC
requests decreased from 185ms to 0.2ms.
the `LifecycleManagerEndpoint` metrics before this PR:
```
histogram for LifecycleManagerEndpoint RPC metrics: class
org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
count: 229298
min: 8732
mean: 1.8579149415564203E8
p50: 1.10197579E8
p75: 3.1664286125E8
p95: 5.927707685999998E8
p99: 7.620123359800001E8
max: 862981475
```
after this PR.
```
histogram for LifecycleManagerEndpoint RPC metrics: class
org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
count: 229298
min: 6281
mean: 20155.255836575874
p50: 19623.5
p75: 23865.25
p95: 32006.749999999996
p99: 45231.44
max: 74217
```
count of slow `MapperEnd` requests before this PR:
```
$ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
124801
```
after this PR:
```
$ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
0
```
the fireflame before this PR
<img width="1917" alt="截屏2024-11-19 19 38 10"
src="https://github.com/user-attachments/assets/16294992-0e51-402e-8da0-035a2226c7dd">
Closes #2905 from cfmcgrady/improve-mapper-end.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit ed52e8d3b608592a66ed8c60b370fb826d4d0f5a)
Signed-off-by: Wang, Fei <[email protected]>
---
.../client/commit/ReducePartitionCommitHandler.scala | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index e86bc2317..2877e8562 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -245,10 +245,10 @@ class ReducePartitionCommitHandler(
if (attempts(mapId) < 0) {
attempts(mapId) = attemptId
// Mapper with this attemptId finished, also check all other mapper
finished or not.
- (true, !attempts.exists(_ < 0))
+ (true, areAllMapperAttemptsFinished(attempts))
} else {
// Mapper with another attemptId finished, skip this request
- (false, !attempts.exists(_ < 0))
+ (false, false)
}
}
}
@@ -326,4 +326,15 @@ class ReducePartitionCommitHandler(
(timeout <= 0, stageEndTimeout - timeout)
}
+
+ private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
+ var i = attempts.length - 1
+ while (i >= 0) {
+ if (attempts(i) < 0) {
+ return false
+ }
+ i -= 1
+ }
+ true
+ }
}