CodingCat commented on code in PR #2366:
URL: 
https://github.com/apache/incubator-celeborn/pull/2366#discussion_r1518279313


##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -251,11 +256,96 @@ private[celeborn] class Inbox(
         dealWithFatalError(fatal)
     }
   }
+}
 
-  // exposed only for testing
-  def getNumActiveThreads: Int = {
+/**
+ * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to 
it thread-safely.
+ */
+private[celeborn] class InMemoryInbox(
+    endpointRef: NettyRpcEndpointRef,
+    endpoint: RpcEndpoint)
+  extends InboxBase(endpointRef, endpoint) {
+
+  inbox => // Give this an alias so we can use it more clearly in closures.
+
+  @GuardedBy("this")
+  protected val messages = new java.util.LinkedList[InboxMessage]
+
+  private val messageCount = new AtomicLong(0)
+
+  // OnStart should be the first message to process
+  inbox.synchronized {
+    messages.add(OnStart)
+  }
+
+  override def nextMessage(quitWithNoProcessingThread: Boolean): 
Option[InboxMessage] = {

Review Comment:
   Hi, I missed some changes when upstreaming....
   
   that synchronized block is unnecessary 
   
   and the major reason to have this method are that 
   
   (1) code can be reused in within process method
   (2) as different implementation has different data structure to hold 
InboxMessage, we need to implement in subclass respectively 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to