This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 23113898f [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker
registration
23113898f is described below
commit 23113898f6ca4028d69a48cc83dd7356b7013be7
Author: zhangzhao.08 <[email protected]>
AuthorDate: Thu Oct 24 15:51:29 2024 +0800
[CELEBORN-1667] Fix NPE & LEAK occurring prior to worker registration
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addressed the memory leak problem of worker nodes before
registration and the NPE issue that occurs when PushDataHandler is accessed
during the initialization process.


### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Pass GA
Closes #2843 from zhaostu4/zhao/worker_npe.
Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/network/server/TransportRequestHandler.java | 4 ++++
.../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 3 +--
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index fde1f56e2..1b7b58d09 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -87,6 +87,10 @@ public class TransportRequestHandler extends
MessageHandler<RequestMessage> {
} else {
processOtherMessages(request);
}
+ } else {
+ if (request.body() != null) {
+ request.body().release();
+ }
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 3b75dbe12..2652b14cf 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -79,7 +79,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
replicateThreadPool = worker.replicateThreadPool
unavailablePeers = worker.unavailablePeers
replicateClientFactory = worker.replicateClientFactory
- registered = Some(worker.registered)
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
diskReserveRatio = worker.conf.workerDiskReserveRatio
@@ -95,7 +94,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
-
+ registered = Some(worker.registered)
logInfo(
s"diskReserveSize ${Utils.bytesToString(diskReserveSize)},
diskReserveRatio ${diskReserveRatio.orNull}")
}