This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 5287f5d0c [CELEBORN-1850] Setup worker endpoint after initalizing 
controller
5287f5d0c is described below

commit 5287f5d0c868f077e26a64e5cfff47ae68913772
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu Feb 6 16:08:26 2025 +0800

    [CELEBORN-1850] Setup worker endpoint after initalizing controller
    
    ### What changes were proposed in this pull request?
    
    Setup worker endpoint after initalizing the controller.
    
    ### Why are the changes needed?
    
    In current flow controller rpc endpoint is setup before calling the 
`controller.init(this)` leaving the required members to be unintialized which 
leading to lot of NullPointer exception in worker during restarts.
    
    ```
    10:53:55.262 [celeborn-dispatcher-6] ERROR 
org.apache.celeborn.common.rpc.netty.Inbox - Ignoring error
    java.lang.NullPointerException: null
            at 
org.apache.celeborn.service.deploy.worker.Controller.org$apache$celeborn$service$deploy$worker$Controller$$handleDestroy(Controller.scala:659)
 ~[celeborn-worker_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.service.deploy.worker.Controller$$anonfun$receiveAndReply$1.applyOrElse(Controller.scala:143)
 ~[celeborn-worker_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.common.rpc.netty.Inbox.processInternal(Inbox.scala:119) 
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:218) 
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:314) 
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:218) 
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
            at 
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:238)
 ~[celeborn-common_2.12-0.5.3.jar:0.5.3]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_412]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_412]
            at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
    10:53:55.570 [main] INFO  org.apache.celeborn.service.deploy.worker.Worker 
- Register worker successfully.
    10:53:55.573 [main] INFO  org.apache.celeborn.service.deploy.worker.Worker 
- Worker started.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    NA
    
    ### How was this patch tested?
    Existing UTs
    
    Closes #3086 from s0nskar/null_controller.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
    
    (cherry picked from commit fdf1883f2520f4e7a18f7bd934dece7f2cb5b2b3)
---
 .../scala/org/apache/celeborn/service/deploy/worker/Worker.scala   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 251422610..813ec60fe 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -194,8 +194,7 @@ private[celeborn] class Worker(
       conf.workerCongestionControlCheckIntervalMs)
   }
 
-  var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
-  rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
+  val controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
 
   // Visible for testing
   private[worker] var internalRpcEndpoint: RpcEndpoint = _
@@ -543,9 +542,11 @@ private[celeborn] class Worker(
     pushDataHandler.init(this)
     replicateHandler.init(this)
     fetchHandler.init(this)
-    controller.init(this)
     workerStatusManager.init(this)
 
+    controller.init(this)
+    rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
+
     logInfo("Worker started.")
     rpcEnv.awaitTermination()
     if (conf.internalPortEnabled) {

Reply via email to