artemlivshits commented on code in PR #13463: URL: https://github.com/apache/kafka/pull/13463#discussion_r1156501035
########## core/src/main/scala/kafka/common/InterBrokerSender.scala: ########## @@ -45,10 +45,15 @@ abstract class InterBrokerSendThread( private val unsentRequests = new UnsentRequests - def generateRequests(): Iterable[RequestAndCompletionHandler] + private val requestManagers = new ArrayList[InterBrokerRequestManager]() Review Comment: The intent here is that requestManagers cannot be updated after the sender starts the thread, but I wonder if we actually honor this invariant in our bootstrap sequence. At the very least we need to set a flag when the thread starts and throw an exception if an attempt to update requestManagers is made after the thread is started. Or just add full synchronization around requestManagers access and that would allow to modify requestManagers after the sender thread started (or use ConcurrentLinkedQueue). ########## core/src/main/scala/kafka/common/InterBrokerSender.scala: ########## @@ -156,6 +163,16 @@ abstract class InterBrokerSendThread( def wakeup(): Unit = networkClient.wakeup() } +abstract class InterBrokerRequestManager() { + + var interBrokerSender: InterBrokerSender = _ Review Comment: I don't think this can be updated and read concurrently -- all updates happen in the startup thread before the object is visible to request threads. We can add a comment to clarify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org