This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 23113898f [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker 
registration
23113898f is described below

commit 23113898f6ca4028d69a48cc83dd7356b7013be7
Author: zhangzhao.08 <[email protected]>
AuthorDate: Thu Oct 24 15:51:29 2024 +0800

    [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker registration
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    This PR  addressed the memory leak problem of worker nodes before 
registration and the NPE issue that occurs when PushDataHandler is accessed 
during the initialization process.
    
    
![image](https://github.com/user-attachments/assets/993f9e9c-fb84-4b71-a77f-6c043cda4864)
    
![image](https://github.com/user-attachments/assets/25545bbf-e838-44b2-88fe-3fe2dada0524)
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Pass GA
    
    Closes #2843 from zhaostu4/zhao/worker_npe.
    
    Authored-by: zhangzhao.08 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/network/server/TransportRequestHandler.java       | 4 ++++
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala   | 3 +--
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index fde1f56e2..1b7b58d09 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -87,6 +87,10 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
       } else {
         processOtherMessages(request);
       }
+    } else {
+      if (request.body() != null) {
+        request.body().release();
+      }
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 3b75dbe12..2652b14cf 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -79,7 +79,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     replicateThreadPool = worker.replicateThreadPool
     unavailablePeers = worker.unavailablePeers
     replicateClientFactory = worker.replicateClientFactory
-    registered = Some(worker.registered)
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.workerDiskReserveSize
     diskReserveRatio = worker.conf.workerDiskReserveRatio
@@ -95,7 +94,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
 
     testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
     testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
-
+    registered = Some(worker.registered)
     logInfo(
       s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, 
diskReserveRatio ${diskReserveRatio.orNull}")
   }

Reply via email to