waitinfuture commented on code in PR #2366:
URL: 
https://github.com/apache/incubator-celeborn/pull/2366#discussion_r1519222275


##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala:
##########
@@ -154,18 +165,21 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) 
extends Logging {
       endpointName: String,
       message: InboxMessage,
       callbackIfStopped: Exception => Unit): Unit = {
+    var data: EndpointData = null
     val error = synchronized {
-      val data = endpoints.get(endpointName)
+      data = endpoints.get(endpointName)
       if (stopped) {
         Some(new RpcEnvStoppedException())
       } else if (data == null) {
         Some(new CelebornException(s"Could not find $endpointName."))
       } else {
-        data.inbox.post(message)
-        receivers.offer(data)
         None
       }
     }
+    if (data != null) {

Review Comment:
   Seems here changes the semantics. Previously if `stopped` is true, message 
will not be post, here will always post if data is not null.



##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -59,153 +61,156 @@ private[celeborn] case class RemoteProcessConnectionError(
     remoteAddress: RpcAddress)
   extends InboxMessage
 
-/**
- * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to 
it thread-safely.
- */
-private[celeborn] class Inbox(
+abstract private[celeborn] class InboxBase(
     val endpointRef: NettyRpcEndpointRef,
-    val endpoint: RpcEndpoint)
-  extends Logging {
+    val endpoint: RpcEndpoint) extends Logging {
 
   inbox => // Give this an alias so we can use it more clearly in closures.
 
+  /** True if the inbox (and its associated endpoint) is stopped. */
   @GuardedBy("this")
-  protected val messages = new java.util.LinkedList[InboxMessage]()
+  protected var stopped = false
 
-  /** True if the inbox (and its associated endpoint) is stopped. */
+  /** The number of threads processing messages for this inbox. */
   @GuardedBy("this")
-  private var stopped = false
+  protected var numActiveThreads = 0
 
   /** Allow multiple threads to process messages at the same time. */
   @GuardedBy("this")
-  private var enableConcurrent = false
+  protected var enableConcurrent = false
 
-  /** The number of threads processing messages for this inbox. */
-  @GuardedBy("this")
-  private var numActiveThreads = 0
+  def isEmpty: Boolean
 
-  // OnStart should be the first message to process
-  inbox.synchronized {
-    messages.add(OnStart)
+  def addMessage(message: InboxMessage): Unit
+
+  def stop(): Unit = inbox.synchronized {
+    // The following codes should be in `synchronized` so that we can make 
sure "OnStop" is the last
+    // message
+    if (!stopped) {
+      // We should disable concurrent here. Then when RpcEndpoint.onStop is 
called, it's the only
+      // thread that is processing messages. So `RpcEndpoint.onStop` can 
release its resources
+      // safely.
+      enableConcurrent = false
+      stopped = true
+      addMessage(OnStop)
+      // Note: The concurrent events in messages will be processed one by one.
+    }
   }
 
-  /**
-   * Process stored messages.
-   */
-  def process(dispatcher: Dispatcher): Unit = {
-    var message: InboxMessage = null
+  def post(message: InboxMessage): Unit = inbox.synchronized {
+    if (stopped) {
+      // We already put "OnStop" into "messages", so we should drop further 
messages
+      onDrop(message)
+    } else {
+      addMessage(message)
+      false
+    }
+  }
+
+  // exposed only for testing
+  def getNumActiveThreads: Int = {
     inbox.synchronized {
-      if (!enableConcurrent && numActiveThreads != 0) {
-        return
-      }
-      message = messages.poll()
-      if (message != null) {
-        numActiveThreads += 1
-      } else {
-        return
-      }
+      inbox.numActiveThreads
     }
-    while (true) {
-      safelyCall(endpoint, endpointRef.name) {
-        message match {
-          case RpcMessage(_sender, content, context) =>
-            try {
-              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](
-                content,
-                { msg =>
-                  throw new CelebornException(s"Unsupported message $message 
from ${_sender}")
-                })
-            } catch {
-              case e: Throwable =>
-                context.sendFailure(e)
-                // Throw the exception -- this exception will be caught by the 
safelyCall function.
-                // The endpoint's onError function will be called.
-                throw e
-            }
+  }
 
-          case OneWayMessage(_sender, content) =>
-            endpoint.receive.applyOrElse[Any, Unit](
-              content,
-              { msg =>
-                throw new CelebornException(s"Unsupported message $message 
from ${_sender}")
-              })
-
-          case OnStart =>
-            endpoint.onStart()
-            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
-              inbox.synchronized {
-                if (!stopped) {
-                  enableConcurrent = true
-                }
-              }
-            }
+  protected def processInternal(dispatcher: Dispatcher, message: 
InboxMessage): Unit = {
+    message match {
+      case RpcMessage(_sender, content, context) =>
+        try {
+          endpoint.receiveAndReply(context).applyOrElse[Any, Unit](
+            content,
+            { msg =>
+              throw new CelebornException(s"Unsupported message $message from 
${_sender}")
+            })
+        } catch {
+          case e: Throwable =>
+            context.sendFailure(e)
+            // Throw the exception -- this exception will be caught by the 
safelyCall function.
+            // The endpoint's onError function will be called.
+            throw e
+        }
 
-          case OnStop =>
-            val activeThreads = inbox.synchronized {
-              inbox.numActiveThreads
+      case OneWayMessage(_sender, content) =>
+        endpoint.receive.applyOrElse[Any, Unit](
+          content,
+          { msg =>
+            throw new CelebornException(s"Unsupported message $message from 
${_sender}")
+          })
+
+      case OnStart =>
+        endpoint.onStart()
+        if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
+          inbox.synchronized {
+            if (!stopped) {
+              enableConcurrent = true
             }
-            assert(
-              activeThreads == 1,
-              s"There should be only a single active thread but found 
$activeThreads threads.")
-            dispatcher.removeRpcEndpointRef(endpoint)
-            endpoint.onStop()
-            assert(isEmpty, "OnStop should be the last message")
+          }
+        }
 
-          case RemoteProcessConnected(remoteAddress) =>
-            endpoint.onConnected(remoteAddress)
+      case OnStop =>
+        val activeThreads = inbox.synchronized {
+          inbox.numActiveThreads
+        }
+        assert(
+          activeThreads == 1,
+          s"There should be only a single active thread but found 
$activeThreads threads.")
+        dispatcher.removeRpcEndpointRef(endpoint)
+        endpoint.onStop()
+        assert(isEmpty, "OnStop should be the last message")
 
-          case RemoteProcessDisconnected(remoteAddress) =>
-            endpoint.onDisconnected(remoteAddress)
+      case RemoteProcessConnected(remoteAddress) =>
+        endpoint.onConnected(remoteAddress)
 
-          case RemoteProcessConnectionError(cause, remoteAddress) =>
-            endpoint.onNetworkError(cause, remoteAddress)
-        }
-      }
+      case RemoteProcessDisconnected(remoteAddress) =>
+        endpoint.onDisconnected(remoteAddress)
 
-      inbox.synchronized {
-        // "enableConcurrent" will be set to false after `onStop` is called, 
so we should check it
-        // every time.
-        if (!enableConcurrent && numActiveThreads != 1) {
-          // If we are not the only one worker, exit
-          numActiveThreads -= 1
-          return
-        }
-        message = messages.poll()
-        if (message == null) {
-          numActiveThreads -= 1
-          return
-        }
-      }
+      case RemoteProcessConnectionError(cause, remoteAddress) =>
+        endpoint.onNetworkError(cause, remoteAddress)
+
+      case other =>
+        throw new IllegalStateException(s"unsupported message $other")
     }
   }
 
-  def post(message: InboxMessage): Unit = inbox.synchronized {
-    if (stopped) {
-      // We already put "OnStop" into "messages", so we should drop further 
messages
-      onDrop(message)
-    } else {
-      messages.add(message)
-      false
+  def process(dispatcher: Dispatcher): Unit = {
+
+    var nextMsg: Option[InboxMessage] = None
+    inbox.synchronized {
+      nextMsg = nextMessage(quitWithNoProcessingThread = true)
+      if (nextMsg.isDefined) {
+        numActiveThreads += 1
+      } else {
+        return
+      }
     }
-  }
 
-  def stop(): Unit = inbox.synchronized {
-    // The following codes should be in `synchronized` so that we can make 
sure "OnStop" is the last
-    // message
-    if (!stopped) {
-      // We should disable concurrent here. Then when RpcEndpoint.onStop is 
called, it's the only
-      // thread that is processing messages. So `RpcEndpoint.onStop` can 
release its resources
-      // safely.
-      enableConcurrent = false
-      stopped = true
-      messages.add(OnStop)
-      // Note: The concurrent events in messages will be processed one by one.
+    if (nextMsg.nonEmpty) {

Review Comment:
   `nextMsg` is guaranteed not to be empty here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to