This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 1c53ab274 [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker
registration
1c53ab274 is described below
commit 1c53ab274c4a5cdab17db4de77b76602a02f93dc
Author: zhangzhao.08 <[email protected]>
AuthorDate: Thu Oct 24 15:51:29 2024 +0800
[CELEBORN-1667] Fix NPE & LEAK occurring prior to worker registration
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
This PR addressed the memory leak problem of worker nodes before
registration and the NPE issue that occurs when PushDataHandler is accessed
during the initialization process.


### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Pass GA
Closes #2843 from zhaostu4/zhao/worker_npe.
Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 23113898f6ca4028d69a48cc83dd7356b7013be7)
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/network/server/TransportRequestHandler.java | 4 ++++
.../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 3 +--
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index fde1f56e2..1b7b58d09 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -87,6 +87,10 @@ public class TransportRequestHandler extends
MessageHandler<RequestMessage> {
} else {
processOtherMessages(request);
}
+ } else {
+ if (request.body() != null) {
+ request.body().release();
+ }
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 159520c77..3670a607d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -79,7 +79,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
replicateThreadPool = worker.replicateThreadPool
unavailablePeers = worker.unavailablePeers
replicateClientFactory = worker.replicateClientFactory
- registered = Some(worker.registered)
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
diskReserveRatio = worker.conf.workerDiskReserveRatio
@@ -95,7 +94,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
-
+ registered = Some(worker.registered)
logInfo(
s"diskReserveSize ${Utils.bytesToString(diskReserveSize)},
diskReserveRatio ${diskReserveRatio.orNull}")
}