waitinfuture commented on code in PR #2366:
URL: https://github.com/apache/celeborn/pull/2366#discussion_r1543178923
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -85,84 +96,132 @@ private[celeborn] class Inbox(
private var numActiveThreads = 0
// OnStart should be the first message to process
- inbox.synchronized {
+ try {
+ inboxLock.lockInterruptibly()
messages.add(OnStart)
+ } finally {
+ inboxLock.unlock()
+ }
+
+ def addMessage(message: InboxMessage): Unit = {
+ messages.add(message)
+ messageCount.incrementAndGet()
+ signalNotFull()
+ logDebug(s"queue length of ${messageCount.get()} ")
+ }
+
+ private def processInternal(dispatcher: Dispatcher, message: InboxMessage):
Unit = {
+ message match {
+ case RpcMessage(_sender, content, context) =>
+ try {
+ endpoint.receiveAndReply(context).applyOrElse[Any, Unit](
+ content,
+ { msg =>
+ throw new CelebornException(s"Unsupported message $message from
${_sender}")
+ })
+ } catch {
+ case e: Throwable =>
+ context.sendFailure(e)
+ // Throw the exception -- this exception will be caught by the
safelyCall function.
+ // The endpoint's onError function will be called.
+ throw e
+ }
+
+ case OneWayMessage(_sender, content) =>
+ endpoint.receive.applyOrElse[Any, Unit](
+ content,
+ { msg =>
+ throw new CelebornException(s"Unsupported message $message from
${_sender}")
+ })
+
+ case OnStart =>
+ endpoint.onStart()
+ if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
+ try {
+ inboxLock.lockInterruptibly()
+ if (!stopped) {
+ enableConcurrent = true
+ }
+ } finally {
+ inboxLock.unlock()
+ }
+ }
+
+ case OnStop =>
+ val activeThreads =
+ try {
+ inboxLock.lockInterruptibly()
+ inbox.numActiveThreads
+ } finally {
+ inboxLock.unlock()
+ }
+ assert(
+ activeThreads == 1,
+ s"There should be only a single active thread but found
$activeThreads threads.")
+ dispatcher.removeRpcEndpointRef(endpoint)
+ endpoint.onStop()
+ assert(isEmpty, "OnStop should be the last message")
+
+ case RemoteProcessConnected(remoteAddress) =>
+ endpoint.onConnected(remoteAddress)
+
+ case RemoteProcessDisconnected(remoteAddress) =>
+ endpoint.onDisconnected(remoteAddress)
+
+ case RemoteProcessConnectionError(cause, remoteAddress) =>
+ endpoint.onNetworkError(cause, remoteAddress)
+
+ case other =>
+ throw new IllegalStateException(s"unsupported message $other")
+ }
+ }
+
+ private[netty] def waitOnFull(): Unit = {
+ if (capacity > 0 && !stopped) {
+ try {
+ inboxLock.lockInterruptibly()
+ while (messageCount.get() == capacity) {
+ isFull.await()
Review Comment:
IIUC, the while loop holds `inboxLock` and waits for `isFull`, however
`process` first tries to lock then call `signalNotFull`, so seems there is
deadlock here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]