waitinfuture commented on code in PR #2366:
URL:
https://github.com/apache/incubator-celeborn/pull/2366#discussion_r1522644752
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -59,109 +62,171 @@ private[celeborn] case class RemoteProcessConnectionError(
remoteAddress: RpcAddress)
extends InboxMessage
-/**
- * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to
it thread-safely.
- */
private[celeborn] class Inbox(
val endpointRef: NettyRpcEndpointRef,
- val endpoint: RpcEndpoint)
- extends Logging {
+ val endpoint: RpcEndpoint,
+ val conf: CelebornConf) extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
+ /** True if the inbox (and its associated endpoint) is stopped. */
@GuardedBy("this")
- protected val messages = new java.util.LinkedList[InboxMessage]()
+ protected var stopped = false
- /** True if the inbox (and its associated endpoint) is stopped. */
+ /** The number of threads processing messages for this inbox. */
@GuardedBy("this")
- private var stopped = false
+ protected var numActiveThreads = 0
/** Allow multiple threads to process messages at the same time. */
@GuardedBy("this")
- private var enableConcurrent = false
+ protected var enableConcurrent = false
- /** The number of threads processing messages for this inbox. */
@GuardedBy("this")
- private var numActiveThreads = 0
+ protected val messages = {
+ val capacity = conf.get(CelebornConf.RPC_IN_MEMORY_BOUNDED_INBOX_CAPACITY)
+ if (capacity == 0) {
+ new LinkedBlockingQueue[InboxMessage]
+ } else {
+ new LinkedBlockingQueue[InboxMessage](capacity)
+ }
+ }
+
+ private val messageCount = new AtomicLong(0)
// OnStart should be the first message to process
inbox.synchronized {
messages.add(OnStart)
}
- /**
- * Process stored messages.
- */
- def process(dispatcher: Dispatcher): Unit = {
- var message: InboxMessage = null
+ def stop(): Unit = inbox.synchronized {
+ // The following codes should be in `synchronized` so that we can make
sure "OnStop" is the last
+ // message
+ if (!stopped) {
+ // We should disable concurrent here. Then when RpcEndpoint.onStop is
called, it's the only
+ // thread that is processing messages. So `RpcEndpoint.onStop` can
release its resources
+ // safely.
+ enableConcurrent = false
+ stopped = true
+ addMessage(OnStop)
+ // Note: The concurrent events in messages will be processed one by one.
+ }
+ }
+
+ def post(message: InboxMessage): Unit = {
Review Comment:
Consider this scenario:
1. At time 1 thread1 calls `post` and `stopped` is false, so it goes to call
`addMessage`
2. At time 2 thread2 calls `stop` and stopped is true
3. At time 3 thread1 still invokes `addMessage`
This will not happen prior to this PR because it's synchronized
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala:
##########
@@ -154,18 +155,21 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv)
extends Logging {
endpointName: String,
message: InboxMessage,
callbackIfStopped: Exception => Unit): Unit = {
+ var data: EndpointData = null
val error = synchronized {
- val data = endpoints.get(endpointName)
+ data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new CelebornException(s"Could not find $endpointName."))
} else {
- data.inbox.post(message)
- receivers.offer(data)
None
}
}
+ if (error.isEmpty) {
Review Comment:
Consider this case:
1. At time 1 thread1 calls `postMessage` and `stopped` is false, then it
reaches to 169 and tries to post the message;
2. At time 2 thread2 calls `stop` and stops the dispatcher
3. At time 3 thread1 still posts the message and offers to `receivers`
Prior to this PR the above will not happen because it's always inside the
synchronize block
##########
common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala:
##########
@@ -59,109 +62,171 @@ private[celeborn] case class RemoteProcessConnectionError(
remoteAddress: RpcAddress)
extends InboxMessage
-/**
- * An inbox that stores messages for an [[RpcEndpoint]] and posts messages to
it thread-safely.
- */
private[celeborn] class Inbox(
val endpointRef: NettyRpcEndpointRef,
- val endpoint: RpcEndpoint)
- extends Logging {
+ val endpoint: RpcEndpoint,
+ val conf: CelebornConf) extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
+ /** True if the inbox (and its associated endpoint) is stopped. */
@GuardedBy("this")
- protected val messages = new java.util.LinkedList[InboxMessage]()
+ protected var stopped = false
- /** True if the inbox (and its associated endpoint) is stopped. */
+ /** The number of threads processing messages for this inbox. */
@GuardedBy("this")
- private var stopped = false
+ protected var numActiveThreads = 0
/** Allow multiple threads to process messages at the same time. */
@GuardedBy("this")
- private var enableConcurrent = false
+ protected var enableConcurrent = false
- /** The number of threads processing messages for this inbox. */
@GuardedBy("this")
- private var numActiveThreads = 0
+ protected val messages = {
+ val capacity = conf.get(CelebornConf.RPC_IN_MEMORY_BOUNDED_INBOX_CAPACITY)
+ if (capacity == 0) {
+ new LinkedBlockingQueue[InboxMessage]
+ } else {
+ new LinkedBlockingQueue[InboxMessage](capacity)
+ }
+ }
+
+ private val messageCount = new AtomicLong(0)
// OnStart should be the first message to process
inbox.synchronized {
messages.add(OnStart)
}
- /**
- * Process stored messages.
- */
- def process(dispatcher: Dispatcher): Unit = {
- var message: InboxMessage = null
+ def stop(): Unit = inbox.synchronized {
+ // The following codes should be in `synchronized` so that we can make
sure "OnStop" is the last
+ // message
+ if (!stopped) {
+ // We should disable concurrent here. Then when RpcEndpoint.onStop is
called, it's the only
+ // thread that is processing messages. So `RpcEndpoint.onStop` can
release its resources
+ // safely.
+ enableConcurrent = false
+ stopped = true
+ addMessage(OnStop)
+ // Note: The concurrent events in messages will be processed one by one.
+ }
+ }
+
+ def post(message: InboxMessage): Unit = {
+ if (stopped) {
+ // We already put "OnStop" into "messages", so we should drop further
messages
+ onDrop(message)
+ } else {
+ addMessage(message)
+ }
+ }
+
+ // exposed only for testing
+ def getNumActiveThreads: Int = {
inbox.synchronized {
- if (!enableConcurrent && numActiveThreads != 0) {
- return
- }
- message = messages.poll()
- if (message != null) {
- numActiveThreads += 1
- } else {
- return
- }
+ inbox.numActiveThreads
}
- while (true) {
- safelyCall(endpoint, endpointRef.name) {
- message match {
- case RpcMessage(_sender, content, context) =>
- try {
- endpoint.receiveAndReply(context).applyOrElse[Any, Unit](
- content,
- { msg =>
- throw new CelebornException(s"Unsupported message $message
from ${_sender}")
- })
- } catch {
- case e: Throwable =>
- context.sendFailure(e)
- // Throw the exception -- this exception will be caught by the
safelyCall function.
- // The endpoint's onError function will be called.
- throw e
- }
+ }
- case OneWayMessage(_sender, content) =>
- endpoint.receive.applyOrElse[Any, Unit](
- content,
- { msg =>
- throw new CelebornException(s"Unsupported message $message
from ${_sender}")
- })
-
- case OnStart =>
- endpoint.onStart()
- if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
- inbox.synchronized {
- if (!stopped) {
- enableConcurrent = true
- }
- }
- }
+ def nextMessage(quitWithNoProcessingThread: Boolean): Option[InboxMessage] =
{
Review Comment:
I think it's not that necessary to extract this method, and
`quitWithNoProcessingThread` is a little confusing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]