This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 5287f5d0c [CELEBORN-1850] Setup worker endpoint after initalizing
controller
5287f5d0c is described below
commit 5287f5d0c868f077e26a64e5cfff47ae68913772
Author: Sanskar Modi <[email protected]>
AuthorDate: Thu Feb 6 16:08:26 2025 +0800
[CELEBORN-1850] Setup worker endpoint after initalizing controller
### What changes were proposed in this pull request?
Setup worker endpoint after initalizing the controller.
### Why are the changes needed?
In current flow controller rpc endpoint is setup before calling the
`controller.init(this)` leaving the required members to be unintialized which
leading to lot of NullPointer exception in worker during restarts.
```
10:53:55.262 [celeborn-dispatcher-6] ERROR
org.apache.celeborn.common.rpc.netty.Inbox - Ignoring error
java.lang.NullPointerException: null
at
org.apache.celeborn.service.deploy.worker.Controller.org$apache$celeborn$service$deploy$worker$Controller$$handleDestroy(Controller.scala:659)
~[celeborn-worker_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.service.deploy.worker.Controller$$anonfun$receiveAndReply$1.applyOrElse(Controller.scala:143)
~[celeborn-worker_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.common.rpc.netty.Inbox.processInternal(Inbox.scala:119)
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.common.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:218)
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.common.rpc.netty.Inbox.safelyCall(Inbox.scala:314)
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.common.rpc.netty.Inbox.process(Inbox.scala:218)
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
at
org.apache.celeborn.common.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:238)
~[celeborn-common_2.12-0.5.3.jar:0.5.3]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_412]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_412]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]
10:53:55.570 [main] INFO org.apache.celeborn.service.deploy.worker.Worker
- Register worker successfully.
10:53:55.573 [main] INFO org.apache.celeborn.service.deploy.worker.Worker
- Worker started.
```
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Existing UTs
Closes #3086 from s0nskar/null_controller.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit fdf1883f2520f4e7a18f7bd934dece7f2cb5b2b3)
---
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 251422610..813ec60fe 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -194,8 +194,7 @@ private[celeborn] class Worker(
conf.workerCongestionControlCheckIntervalMs)
}
- var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
- rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
+ val controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
// Visible for testing
private[worker] var internalRpcEndpoint: RpcEndpoint = _
@@ -543,9 +542,11 @@ private[celeborn] class Worker(
pushDataHandler.init(this)
replicateHandler.init(this)
fetchHandler.init(this)
- controller.init(this)
workerStatusManager.init(this)
+ controller.init(this)
+ rpcEnv.setupEndpoint(RpcNameConstants.WORKER_EP, controller)
+
logInfo("Worker started.")
rpcEnv.awaitTermination()
if (conf.internalPortEnabled) {