This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 1c53ab274 [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker 
registration
1c53ab274 is described below

commit 1c53ab274c4a5cdab17db4de77b76602a02f93dc
Author: zhangzhao.08 <[email protected]>
AuthorDate: Thu Oct 24 15:51:29 2024 +0800

    [CELEBORN-1667] Fix NPE & LEAK occurring prior to worker registration
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    This PR  addressed the memory leak problem of worker nodes before 
registration and the NPE issue that occurs when PushDataHandler is accessed 
during the initialization process.
    
    
![image](https://github.com/user-attachments/assets/993f9e9c-fb84-4b71-a77f-6c043cda4864)
    
![image](https://github.com/user-attachments/assets/25545bbf-e838-44b2-88fe-3fe2dada0524)
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Pass GA
    
    Closes #2843 from zhaostu4/zhao/worker_npe.
    
    Authored-by: zhangzhao.08 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 23113898f6ca4028d69a48cc83dd7356b7013be7)
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/common/network/server/TransportRequestHandler.java       | 4 ++++
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala   | 3 +--
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
index fde1f56e2..1b7b58d09 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportRequestHandler.java
@@ -87,6 +87,10 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
       } else {
         processOtherMessages(request);
       }
+    } else {
+      if (request.body() != null) {
+        request.body().release();
+      }
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 159520c77..3670a607d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -79,7 +79,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     replicateThreadPool = worker.replicateThreadPool
     unavailablePeers = worker.unavailablePeers
     replicateClientFactory = worker.replicateClientFactory
-    registered = Some(worker.registered)
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.workerDiskReserveSize
     diskReserveRatio = worker.conf.workerDiskReserveRatio
@@ -95,7 +94,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
 
     testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
     testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
-
+    registered = Some(worker.registered)
     logInfo(
       s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, 
diskReserveRatio ${diskReserveRatio.orNull}")
   }

Reply via email to