CodingCat commented on code in PR #2366:
URL: 
https://github.com/apache/incubator-celeborn/pull/2366#discussion_r1528761197


##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -59,109 +62,171 @@ private[celeborn] case class RemoteProcessConnectionError(
     remoteAddress: RpcAddress)
   extends InboxMessage
 
-/**
- * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to 
it thread-safely.
- */
 private[celeborn] class Inbox(
     val endpointRef: NettyRpcEndpointRef,
-    val endpoint: RpcEndpoint)
-  extends Logging {
+    val endpoint: RpcEndpoint,
+    val conf: CelebornConf) extends Logging {
 
   inbox => // Give this an alias so we can use it more clearly in closures.
 
+  /** True if the inbox (and its associated endpoint) is stopped. */
   @GuardedBy("this")
-  protected val messages = new java.util.LinkedList[InboxMessage]()
+  protected var stopped = false
 
-  /** True if the inbox (and its associated endpoint) is stopped. */
+  /** The number of threads processing messages for this inbox. */
   @GuardedBy("this")
-  private var stopped = false
+  protected var numActiveThreads = 0
 
   /** Allow multiple threads to process messages at the same time. */
   @GuardedBy("this")
-  private var enableConcurrent = false
+  protected var enableConcurrent = false
 
-  /** The number of threads processing messages for this inbox. */
   @GuardedBy("this")
-  private var numActiveThreads = 0
+  protected val messages = {
+    val capacity = conf.get(CelebornConf.RPC_IN_MEMORY_BOUNDED_INBOX_CAPACITY)
+    if (capacity == 0) {
+      new LinkedBlockingQueue[InboxMessage]
+    } else {
+      new LinkedBlockingQueue[InboxMessage](capacity)
+    }
+  }
+
+  private val messageCount = new AtomicLong(0)
 
   // OnStart should be the first message to process
   inbox.synchronized {
     messages.add(OnStart)
   }
 
-  /**
-   * Process stored messages.
-   */
-  def process(dispatcher: Dispatcher): Unit = {
-    var message: InboxMessage = null
+  def stop(): Unit = inbox.synchronized {
+    // The following codes should be in `synchronized` so that we can make 
sure "OnStop" is the last
+    // message
+    if (!stopped) {
+      // We should disable concurrent here. Then when RpcEndpoint.onStop is 
called, it's the only
+      // thread that is processing messages. So `RpcEndpoint.onStop` can 
release its resources
+      // safely.
+      enableConcurrent = false
+      stopped = true
+      addMessage(OnStop)
+      // Note: The concurrent events in messages will be processed one by one.
+    }
+  }
+
+  def post(message: InboxMessage): Unit = {

Review Comment:
   I just updated the code, this method is called within 
dispatcher.synchronized, so I think removing the synchronized block should be 
fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to