CodingCat commented on code in PR #2366:
URL:
https://github.com/apache/incubator-celeborn/pull/2366#discussion_r1518280793
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -251,11 +256,96 @@ private[celeborn] class Inbox(
dealWithFatalError(fatal)
}
}
+}
- // exposed only for testing
- def getNumActiveThreads: Int = {
+/**
+ * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to
it thread-safely.
+ */
+private[celeborn] class InMemoryInbox(
+ endpointRef: NettyRpcEndpointRef,
+ endpoint: RpcEndpoint)
+ extends InboxBase(endpointRef, endpoint) {
+
+ inbox => // Give this an alias so we can use it more clearly in closures.
+
+ @GuardedBy("this")
+ protected val messages = new java.util.LinkedList[InboxMessage]
+
+ private val messageCount = new AtomicLong(0)
+
+ // OnStart should be the first message to process
+ inbox.synchronized {
+ messages.add(OnStart)
+ }
+
+ override def nextMessage(quitWithNoProcessingThread: Boolean):
Option[InboxMessage] = {
+ var message: Option[InboxMessage] = None
+ if (quitWithNoProcessingThread && !enableConcurrent && numActiveThreads !=
0) {
+ return None
+ }
+ val startTime = System.nanoTime()
+ message = Option(messages.poll())
+ if (message.isDefined) {
+ logDebug(s"message count: ${messageCount.incrementAndGet()}," +
+ s" read cost :${System.nanoTime() - startTime}, message queue length:
${messages.size()}")
+ }
+ message
+ }
+
+ override def addMessage(message: InboxMessage): Unit = {
+ messages.add(message)
+ }
+
+ override def isEmpty: Boolean = inbox.synchronized {
+ messages.isEmpty
+ }
+}
+
+private[celeborn] class InMemoryBoundedInbox(
+ endpointRef: NettyRpcEndpointRef,
+ endpoint: RpcEndpoint,
+ capacity: Int) extends InboxBase(endpointRef, endpoint) {
+
+ inbox =>
+
+ @GuardedBy("this")
+ protected val messages = new LinkedBlockingQueue[InboxMessage](capacity)
+
+ private val messageCount = new AtomicLong(0)
+
+ inbox.synchronized {
+ messages.add(OnStart)
+ }
+
+ override def isEmpty: Boolean = {
+ messages.isEmpty
+ }
+
+ override def addMessage(message: InboxMessage): Unit = {
+ messages.put(message)
+ }
+
+ override def nextMessage(quitWithNoProcessingThread: Boolean):
Option[InboxMessage] = {
Review Comment:
sorry, added the missing changes and explained above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]