This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fe623888b [CELEBORN-1290] Fix NPE occurring prior to worker 
registration
fe623888b is described below

commit fe623888bf21dcecd662df3feafa3a19082e7ae3
Author: Fu Chen <[email protected]>
AuthorDate: Tue Feb 27 19:14:19 2024 +0800

    [CELEBORN-1290] Fix NPE occurring prior to worker registration
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    This PR addressed a NPE issue occurs when the `Worker#reigstered` member is 
accessed before it is initialized.
    
    The problem occurs because the `TransportChannelHandler` might be served 
before the worker is registered.
    
    ```
    24/02/01 15:07:32,090 WARN [push-server-6-6] TransportChannelHandler: 
Exception in connection from /xx.xx.xx.xx:xxx
    java.lang.NullPointerException
            at 
org.apache.celeborn.service.deploy.worker.PushDataHandler.checkRegistered(PushDataHandler.scala:714)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.checkRegistered(TransportRequestHandler.java:82)
            at 
org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:76)
            at 
org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:151)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
            at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
            at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
            at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
            at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #2274 from cfmcgrady/check-registered.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala    | 6 +++---
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index ceabd9802..da9d83653 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -57,7 +57,7 @@ class FetchHandler(
     conf.readBuffersToTriggerReadMin)
   var storageManager: StorageManager = _
   var partitionsSorter: PartitionFilesSorter = _
-  var registered: AtomicBoolean = new AtomicBoolean(false)
+  var registered: Option[AtomicBoolean] = None
 
   def init(worker: Worker): Unit = {
     workerSource.addGauge(WorkerSource.ACTIVE_CHUNK_STREAM_COUNT) { () =>
@@ -74,7 +74,7 @@ class FetchHandler(
 
     this.storageManager = worker.storageManager
     this.partitionsSorter = worker.partitionsSorter
-    this.registered = worker.registered
+    this.registered = Some(worker.registered)
   }
 
   def getRawDiskFileInfo(
@@ -456,7 +456,7 @@ class FetchHandler(
     }
   }
 
-  override def checkRegistered: Boolean = registered.get
+  override def checkRegistered: Boolean = registered.exists(_.get)
 
   /** Invoked when the channel associated with the given client is active. */
   override def channelActive(client: TransportClient): Unit = {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 62d91e817..356b0d4cd 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -56,7 +56,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
   private var replicateThreadPool: ThreadPoolExecutor = _
   private var unavailablePeers: ConcurrentHashMap[WorkerInfo, Long] = _
   private var replicateClientFactory: TransportClientFactory = _
-  private var registered: AtomicBoolean = _
+  private var registered: Option[AtomicBoolean] = None
   private var workerInfo: WorkerInfo = _
   private var diskReserveSize: Long = _
   private var diskReserveRatio: Option[Double] = _
@@ -79,7 +79,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     replicateThreadPool = worker.replicateThreadPool
     unavailablePeers = worker.unavailablePeers
     replicateClientFactory = worker.replicateClientFactory
-    registered = worker.registered
+    registered = Some(worker.registered)
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.workerDiskReserveSize
     diskReserveRatio = worker.conf.workerDiskReserveRatio
@@ -707,7 +707,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     (mapId, attemptId)
   }
 
-  override def checkRegistered(): Boolean = registered.get()
+  override def checkRegistered(): Boolean = registered.exists(_.get)
 
   class RpcResponseCallbackWithTimer(
       source: Source,

Reply via email to