This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 1c39825f8 [CELEBORN-1725] Optimize performance of handling `MapperEnd` 
RPC in `LifecycleManager`
1c39825f8 is described below

commit 1c39825f826916732688af64a2bb1d778fd5607c
Author: Fu Chen <[email protected]>
AuthorDate: Wed Nov 27 10:43:36 2024 -0800

    [CELEBORN-1725] Optimize performance of handling `MapperEnd` RPC in 
`LifecycleManager`
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    Per 
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#traversal-and-zipwithindex,
 this PR replaces the `exists` with `while` loops to optimize 
performance-sensitive code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA
    
    Conducted a local test of a Spark job where the shuffle write stage `Stage 
0` included 210,000 tasks. The mean processing time for `MapperEnd` RPC 
requests decreased from 185ms to 0.2ms.
    
    the `LifecycleManagerEndpoint` metrics before this PR:
    ```
    histogram for LifecycleManagerEndpoint RPC metrics: class 
org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
    count: 229298
    min: 8732
    mean: 1.8579149415564203E8
    p50: 1.10197579E8
    p75: 3.1664286125E8
    p95: 5.927707685999998E8
    p99: 7.620123359800001E8
    max: 862981475
    ```
    
    after this PR.
    ```
    histogram for LifecycleManagerEndpoint RPC metrics: class 
org.apache.celeborn.common.protocol.message.ControlMessages$MapperEnd
    count: 229298
    min: 6281
    mean: 20155.255836575874
    p50: 19623.5
    p75: 23865.25
    p95: 32006.749999999996
    p99: 45231.44
    max: 74217
    ```
    
    count of slow `MapperEnd` requests before this PR:
    ```
    $ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
    124801
    ```
    
    after this PR:
    ```
    $ grep "slow rpc detected:" driver.log | grep MapperEnd | wc -l
    0
    ```
    
    the fireflame before this PR
    
    <img width="1917" alt="截屏2024-11-19 19 38 10" 
src="https://github.com/user-attachments/assets/16294992-0e51-402e-8da0-035a2226c7dd";>
    
    Closes #2905 from cfmcgrady/improve-mapper-end.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit ed52e8d3b608592a66ed8c60b370fb826d4d0f5a)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../client/commit/ReducePartitionCommitHandler.scala      | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index e86bc2317..2877e8562 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -245,10 +245,10 @@ class ReducePartitionCommitHandler(
       if (attempts(mapId) < 0) {
         attempts(mapId) = attemptId
         // Mapper with this attemptId finished, also check all other mapper 
finished or not.
-        (true, !attempts.exists(_ < 0))
+        (true, areAllMapperAttemptsFinished(attempts))
       } else {
         // Mapper with another attemptId finished, skip this request
-        (false, !attempts.exists(_ < 0))
+        (false, false)
       }
     }
   }
@@ -326,4 +326,15 @@ class ReducePartitionCommitHandler(
 
     (timeout <= 0, stageEndTimeout - timeout)
   }
+
+  private def areAllMapperAttemptsFinished(attempts: Array[Int]): Boolean = {
+    var i = attempts.length - 1
+    while (i >= 0) {
+      if (attempts(i) < 0) {
+        return false
+      }
+      i -= 1
+    }
+    true
+  }
 }

Reply via email to