This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df78204e05b KAFKA-15854: Move Java classes from `kafka.server` to the 
`server` module (#14796)
df78204e05b is described below

commit df78204e05bb1c416d977a0a36f4c263251ae4ef
Author: Ismael Juma <[email protected]>
AuthorDate: Sun Nov 19 22:09:19 2023 -0800

    KAFKA-15854: Move Java classes from `kafka.server` to the `server` module 
(#14796)
    
    We only move Java classes that have minimal or no dependencies on Scala 
classes in this PR.
    
    Details:
    * Configured `server` module in build files.
    * Changed `ControllerRequestCompletionHandler` to be an interface since it 
has no implementations.
    * Cleaned up various import control files.
    * Minor build clean-ups for `server-common`.
    * Disabled `testAssignmentAggregation` when executed with Java 8, this is 
an existing issue (see #14794).
    
    For broader context on this change, please check:
    * KAFKA-15852: Move server code from `core` to `server` module
    
    Reviewers: Divij Vaidya <[email protected]>
---
 build.gradle                                       | 76 ++++++++++++++++++----
 checkstyle/import-control-metadata.xml             |  5 --
 checkstyle/import-control-server-common.xml        |  4 --
 checkstyle/import-control-server.xml               | 61 +++++++++++++++++
 checkstyle/import-control-storage.xml              |  4 --
 .../kafka/server/builders/KafkaApisBuilder.java    |  2 +-
 .../transaction/ProducerIdManager.scala            |  2 +-
 .../scala/kafka/server/AlterPartitionManager.scala |  3 +-
 .../kafka/server/AutoTopicCreationManager.scala    |  5 +-
 .../kafka/server/BrokerLifecycleManager.scala      |  1 +
 .../src/main/scala/kafka/server/BrokerServer.scala |  7 +-
 .../main/scala/kafka/server/ConfigHandler.scala    |  1 +
 .../server/ControllerConfigurationValidator.scala  |  3 +-
 .../server/ControllerRegistrationManager.scala     |  1 +
 .../main/scala/kafka/server/ControllerServer.scala |  3 +-
 .../scala/kafka/server/ForwardingManager.scala     |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  1 +
 core/src/main/scala/kafka/server/KafkaBroker.scala |  1 +
 core/src/main/scala/kafka/server/KafkaServer.scala |  5 +-
 .../server/NodeToControllerChannelManager.scala    | 47 ++-----------
 .../kafka/zk/ZkMigrationIntegrationTest.scala      |  3 +-
 .../server/NodeToControllerRequestThreadTest.scala |  1 +
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  1 +
 .../transaction/ProducerIdManagerTest.scala        |  2 +-
 .../kafka/server/AlterPartitionManagerTest.scala   |  1 +
 .../server/AutoTopicCreationManagerTest.scala      |  4 +-
 .../server/BrokerRegistrationRequestTest.scala     |  3 +-
 .../ControllerConfigurationValidatorTest.scala     |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../MockNodeToControllerChannelManager.scala       |  7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  1 +
 .../apache}/kafka/server/AssignmentsManager.java   |  4 +-
 .../apache}/kafka/server/ClientMetricsManager.java |  2 +-
 .../server/ControllerRequestCompletionHandler.java | 30 +++++++++
 .../server/NodeToControllerChannelManager.java     | 37 +++++++++++
 .../server}/metrics/ClientMetricsConfigs.java      |  2 +-
 .../java/org/apache/kafka/server/package-info.java | 20 ++++++
 .../kafka/server/AssignmentsManagerTest.java       |  7 +-
 .../server}/metrics/ClientMetricsTestUtils.java    |  2 +-
 settings.gradle                                    |  1 +
 40 files changed, 268 insertions(+), 102 deletions(-)

diff --git a/build.gradle b/build.gradle
index 3b7878a3e89..608a57d56ad 100644
--- a/build.gradle
+++ b/build.gradle
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+    implementation project(':clients')
+    implementation project(':server-common')
+
+    implementation libs.slf4jApi
+
+    compileOnly libs.log4j
+
+    testImplementation project(':clients').sourceSets.test.output
+
+    testImplementation libs.mockitoCore
+    testImplementation libs.junitJupiter
+    testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {
+    def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
+    inputs.property "commitId", commitId
+    inputs.property "version", version
+    outputs.file receiptFile
+
+    doLast {
+      def data = [
+        commitId: commitId,
+        version: version,
+      ]
+
+      receiptFile.parentFile.mkdirs()
+      def content = data.entrySet().collect { "$it.key=$it.value" 
}.sort().join("\n")
+      receiptFile.setText(content, "ISO-8859-1")
+    }
+  }
+
+  jar {
+    dependsOn createVersionFile
+    from("$buildDir") {
+      include "kafka/$buildVersionFileName"
+    }
+  }
+
+  clean.doFirst {
+    delete "$buildDir/kafka/"
+  }
+
+  checkstyle {
+    configProperties = checkstyleConfigProperties("import-control-server.xml")
+  }
+
+  javadoc {
+    enabled = false
+  }
+}
+
 project(':core') {
   apply plugin: 'scala'
 
@@ -873,7 +929,7 @@ project(':core') {
     implementation project(':tools:tools-api')
     implementation project(':raft')
     implementation project(':storage')
-
+    implementation project(':server')
 
     implementation libs.argparse4j
     implementation libs.commonsValidator
@@ -912,6 +968,7 @@ project(':core') {
     testImplementation project(':raft').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
     testImplementation project(':storage:storage-api').sourceSets.test.output
+    testImplementation project(':server').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
@@ -1643,19 +1700,6 @@ project(':server-common') {
     }
   }
 
-  sourceSets {
-    main {
-      java {
-        srcDirs = ["src/main/java"]
-      }
-    }
-    test {
-      java {
-        srcDirs = ["src/test/java"]
-      }
-    }
-  }
-
   jar {
     dependsOn createVersionFile
     from("$buildDir") {
@@ -1670,6 +1714,10 @@ project(':server-common') {
   checkstyle {
     configProperties = 
checkstyleConfigProperties("import-control-server-common.xml")
   }
+
+  javadoc {
+    enabled = false
+  }
 }
 
 project(':storage:storage-api') {
diff --git a/checkstyle/import-control-metadata.xml 
b/checkstyle/import-control-metadata.xml
index d4643c19979..897932492b1 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -27,16 +27,11 @@
     <allow pkg="javax.management" />
     <allow pkg="org.slf4j" />
     <allow pkg="org.junit" />
-    <allow pkg="org.opentest4j" />
-    <allow pkg="org.hamcrest" />
     <allow pkg="org.mockito" />
-    <allow pkg="org.easymock" />
-    <allow pkg="org.powermock" />
     <allow pkg="java.security" />
     <allow pkg="javax.net.ssl" />
     <allow pkg="javax.security" />
     <allow pkg="javax.crypto" />
-    <allow pkg="org.ietf.jgss" />
     <allow pkg="net.jqwik.api" />
 
     <!-- no one depends on the server -->
diff --git a/checkstyle/import-control-server-common.xml 
b/checkstyle/import-control-server-common.xml
index 79873703d84..622e7c5e052 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -27,15 +27,11 @@
     <allow pkg="javax.management" />
     <allow pkg="org.slf4j" />
     <allow pkg="org.junit" />
-    <allow pkg="org.opentest4j" />
     <allow pkg="org.hamcrest" />
     <allow pkg="org.mockito" />
-    <allow pkg="org.easymock" />
-    <allow pkg="org.powermock" />
     <allow pkg="java.security" />
     <allow pkg="javax.net.ssl" />
     <allow pkg="javax.security" />
-    <allow pkg="org.ietf.jgss" />
     <allow pkg="net.jqwik.api" />
 
     <!-- no one depends on the server -->
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
new file mode 100644
index 00000000000..499f8327ab6
--- /dev/null
+++ b/checkstyle/import-control-server.xml
@@ -0,0 +1,61 @@
+<!DOCTYPE import-control PUBLIC
+        "-//Puppy Crawl//DTD Import Control 1.1//EN"
+        "http://www.puppycrawl.com/dtds/import_control_1_1.dtd";>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka.server">
+
+  <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE 
-->
+
+  <!-- common library dependencies -->
+  <allow pkg="java" />
+  <allow pkg="javax.management" />
+  <allow pkg="org.slf4j" />
+  <allow pkg="org.junit" />
+  <allow pkg="org.mockito" />
+  <allow pkg="java.security" />
+  <allow pkg="javax.net.ssl" />
+  <allow pkg="javax.security" />
+  <allow pkg="net.jqwik.api" />
+
+  <!-- no one depends on the server -->
+  <disallow pkg="kafka" />
+
+  <!-- anyone can use public classes -->
+  <allow pkg="org.apache.kafka.common" exact-match="true" />
+  <allow pkg="org.apache.kafka.common.config" />
+  <allow pkg="org.apache.kafka.common.security" />
+  <allow pkg="org.apache.kafka.common.serialization" />
+  <allow pkg="org.apache.kafka.common.utils" />
+  <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+  <allow pkg="org.apache.kafka.common.memory" />
+
+  <!-- protocol, records and request/response utilities -->
+  <allow pkg="org.apache.kafka.clients" />
+  <allow pkg="org.apache.kafka.common.message" />
+  <allow pkg="org.apache.kafka.common.protocol" />
+  <allow pkg="org.apache.kafka.common.requests" />
+
+  <!-- utilities and reusable classes from server-common -->
+  <allow pkg="org.apache.kafka.queue" />
+  <allow pkg="org.apache.kafka.server.common" />
+
+  <!-- persistent collection factories/non-library-specific wrappers -->
+  <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+</import-control>
diff --git a/checkstyle/import-control-storage.xml 
b/checkstyle/import-control-storage.xml
index 42d170ec315..d8c5e287d31 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -27,15 +27,11 @@
     <allow pkg="javax.management" />
     <allow pkg="org.slf4j" />
     <allow pkg="org.junit" />
-    <allow pkg="org.opentest4j" />
     <allow pkg="org.hamcrest" />
     <allow pkg="org.mockito" />
-    <allow pkg="org.easymock" />
-    <allow pkg="org.powermock" />
     <allow pkg="java.security" />
     <allow pkg="javax.net.ssl" />
     <allow pkg="javax.security" />
-    <allow pkg="org.ietf.jgss" />
     <allow pkg="net.jqwik.api" />
 
     <!-- no one depends on the server -->
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 09ae8e6aa15..9182f9c4bcd 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -22,7 +22,6 @@ import kafka.network.RequestChannel;
 import kafka.server.ApiVersionManager;
 import kafka.server.AutoTopicCreationManager;
 import kafka.server.BrokerTopicStats;
-import kafka.server.ClientMetricsManager;
 import kafka.server.DelegationTokenManager;
 import kafka.server.FetchManager;
 import kafka.server.KafkaApis;
@@ -35,6 +34,7 @@ import kafka.server.metadata.ConfigRepository;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.server.ClientMetricsManager;
 import org.apache.kafka.server.authorizer.Authorizer;
 
 import java.util.Collections;
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala 
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 9e7b88d2ed7..06e358d7b6c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -17,7 +17,6 @@
 package kafka.coordinator.transaction
 
 import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit, 
NoRetry, RetryBackoffMs}
-import kafka.server.{NodeToControllerChannelManager, 
ControllerRequestCompletionHandler}
 import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.clients.ClientResponse
@@ -26,6 +25,7 @@ import 
org.apache.kafka.common.message.AllocateProducerIdsRequestData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, 
AllocateProducerIdsResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.ProducerIdsBlock
 
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala 
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 1c5021b69bf..661f8afe5f2 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.requests.{AlterPartitionRequest, 
AlterPartitionResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.util.Scheduler
 
@@ -84,7 +85,7 @@ object AlterPartitionManager {
     threadNamePrefix: String,
     brokerEpochSupplier: () => Long,
   ): AlterPartitionManager = {
-    val channelManager = NodeToControllerChannelManager(
+    val channelManager = new NodeToControllerChannelManagerImpl(
       controllerNodeProvider,
       time = time,
       metrics = metrics,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala 
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 25e934d888c..a9b06dcbc51 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Properties}
-
 import kafka.controller.KafkaController
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.Logging
@@ -34,8 +33,10 @@ import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, 
RequestContext, RequestHeader}
 import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
 import scala.collection.{Map, Seq, Set, mutable}
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait AutoTopicCreationManager {
@@ -193,7 +194,7 @@ class DefaultAutoTopicCreationManager(
 
     val request = metadataRequestContext.map { context =>
       val requestVersion =
-        channelManager.controllerApiVersions() match {
+        channelManager.controllerApiVersions.asScala match {
           case None =>
             // We will rely on the Metadata request to be retried in the case
             // that the latest version is not usable by the controller.
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala 
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index bfe5ca0fde7..0b74ad0a3e4 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.queue.EventQueue.DeadlineFunction
 import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
 import java.util.{Comparator, OptionalLong}
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 045429c8436..56725fa67d0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorCon
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, TopicIdPartition}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -222,7 +223,7 @@ class BrokerServer(
       val controllerNodes = 
RaftConfig.voterConnectionsToNodes(voterConnections).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, 
config, controllerNodes)
 
-      clientToControllerChannelManager = NodeToControllerChannelManager(
+      clientToControllerChannelManager = new 
NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
         metrics,
@@ -279,7 +280,7 @@ class BrokerServer(
         time
       )
 
-      val assignmentsChannelManager = NodeToControllerChannelManager(
+      val assignmentsChannelManager = new NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
         metrics,
@@ -357,7 +358,7 @@ class BrokerServer(
           k -> VersionRange.of(v.min, v.max)
       }.asJava
 
-      val brokerLifecycleChannelManager = NodeToControllerChannelManager(
+      val brokerLifecycleChannelManager = new 
NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
         metrics,
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index a770e17f0c0..5a2c11cb91c 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.internals.QuotaConfigs
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
 import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.storage.internals.log.{LogConfig, 
ThrottledReplicaListValidator}
 import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
 
diff --git 
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala 
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 73a609ee7eb..28ddac0c5e6 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -17,8 +17,6 @@
 
 package kafka.server
 
-import kafka.metrics.ClientMetricsConfigs
-
 import java.util
 import java.util.Properties
 import org.apache.kafka.common.config.ConfigResource
@@ -26,6 +24,7 @@ import 
org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRIC
 import org.apache.kafka.controller.ConfigurationValidator
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 
 import scala.collection.mutable
diff --git 
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala 
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index 6cf42690384..20edae23ca8 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.image.publisher.MetadataPublisher
 import org.apache.kafka.queue.EventQueue.DeadlineFunction
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7b28c3372e1..1f08cdffd05 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, 
LegacyPropagator}
 import org.apache.kafka.metadata.publisher.FeaturesPublisher
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.{ClientMetricsManager, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -432,7 +433,7 @@ class ControllerServer(
        * Start the KIP-919 controller registration manager.
        */
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, 
config, controllerNodes.asScala)
-      registrationChannelManager = NodeToControllerChannelManager(
+      registrationChannelManager = new NodeToControllerChannelManagerImpl(
         controllerNodeProvider,
         time,
         metrics,
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala 
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index b0b13dfecda..7d7b6eba02d 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -18,13 +18,13 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-
 import kafka.network.RequestChannel
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
 import scala.compat.java8.OptionConverters._
 
@@ -165,7 +165,7 @@ class ForwardingManagerImpl(
   }
 
   override def controllerApiVersions: Option[NodeApiVersions] =
-    channelManager.controllerApiVersions()
+    channelManager.controllerApiVersions.asScala
 
   private def parseResponse(
     buffer: ByteBuffer,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3f5a435d147..3a819a3d184 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -67,6 +67,7 @@ import 
org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, 
IBP_2_3_IV0}
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala 
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index ea1d6cf8ed0..e281087f12f 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -32,6 +32,7 @@ import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.metadata.BrokerState
+import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.Scheduler
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6f59820a6f6..66e2cf42713 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -52,6 +52,7 @@ import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble}
 import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, 
VersionRange}
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -338,7 +339,7 @@ class KafkaServer(
         tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
         credentialProvider = new 
CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
-        clientToControllerChannelManager = NodeToControllerChannelManager(
+        clientToControllerChannelManager = new 
NodeToControllerChannelManagerImpl(
           controllerNodeProvider = controllerNodeProvider,
           time = time,
           metrics = metrics,
@@ -428,7 +429,7 @@ class KafkaServer(
           )
           val controllerNodes = 
RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
           val quorumControllerNodeProvider = 
RaftControllerNodeProvider(raftManager, config, controllerNodes)
-          val brokerToQuorumChannelManager = NodeToControllerChannelManager(
+          val brokerToQuorumChannelManager = new 
NodeToControllerChannelManagerImpl(
             controllerNodeProvider = quorumControllerNodeProvider,
             time = time,
             metrics = metrics,
diff --git 
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index ee69a74aede..19d19c87bb1 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -31,10 +31,12 @@ import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.util.{InterBrokerSendThread, 
RequestAndCompletionHandler}
 
 import java.util
+import java.util.Optional
 import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -130,38 +132,6 @@ class RaftControllerNodeProvider(
       listenerName, securityProtocol, saslMechanism, isZkController = false)
 }
 
-object NodeToControllerChannelManager {
-  def apply(
-    controllerNodeProvider: ControllerNodeProvider,
-    time: Time,
-    metrics: Metrics,
-    config: KafkaConfig,
-    channelName: String,
-    threadNamePrefix: String,
-    retryTimeoutMs: Long
-  ): NodeToControllerChannelManager = {
-    new NodeToControllerChannelManagerImpl(
-      controllerNodeProvider,
-      time,
-      metrics,
-      config,
-      channelName,
-      threadNamePrefix,
-      retryTimeoutMs
-    )
-  }
-}
-
-trait NodeToControllerChannelManager {
-  def start(): Unit
-  def shutdown(): Unit
-  def controllerApiVersions(): Option[NodeApiVersions]
-  def sendRequest(
-    request: AbstractRequest.Builder[_ <: AbstractRequest],
-    callback: ControllerRequestCompletionHandler
-  ): Unit
-}
-
 /**
  * This class manages the connection between a broker and the controller. It 
runs a single
  * [[NodeToControllerRequestThread]] which uses the broker's metadata cache as 
its own metadata to find
@@ -270,22 +240,13 @@ class NodeToControllerChannelManagerImpl(
     ))
   }
 
-  def controllerApiVersions(): Option[NodeApiVersions] = {
+  def controllerApiVersions(): Optional[NodeApiVersions] = {
     requestThread.activeControllerAddress().flatMap { activeController =>
       Option(apiVersions.get(activeController.idString))
-    }
+    }.asJava
   }
 }
 
-abstract class ControllerRequestCompletionHandler extends 
RequestCompletionHandler {
-
-  /**
-   * Fire when the request transmission time passes the caller defined 
deadline on the channel queue.
-   * It covers the total waiting time including retries which might be the 
result of individual request timeout.
-   */
-  def onTimeout(): Unit
-}
-
 case class NodeToControllerQueueItem(
   createdTimeMs: Long,
   request: AbstractRequest.Builder[_ <: AbstractRequest],
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 65e6337f07d..753f69d20be 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
-import kafka.server.{ConfigType, ControllerRequestCompletionHandler, 
KafkaConfig}
+import kafka.server.{ConfigType, KafkaConfig}
 import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.test.annotation.{AutoStart, ClusterConfigProperty, 
ClusterTemplate, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
@@ -45,6 +45,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
 import org.apache.kafka.metadata.authorizer.StandardAcl
 import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, 
ProducerIdsBlock}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNotNull, assertTrue, fail}
 import org.junit.jupiter.api.{Assumptions, Timeout}
diff --git 
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index 01432bf6f47..5b98fe4d9c8 100644
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.{AbstractRequest, 
EnvelopeRequest, Envel
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index afa9b8b6e0b..a3454f4d6fe 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.replica.ClientMetadata
 import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 5dd665deae9..8b5d44a2a36 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,7 +17,6 @@
 package kafka.coordinator.transaction
 
 import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
-import kafka.server.NodeToControllerChannelManager
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.common.KafkaException
@@ -26,6 +25,7 @@ import 
org.apache.kafka.common.message.AllocateProducerIdsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AllocateProducerIdsResponse
 import org.apache.kafka.common.utils.{MockTime, Time}
+import org.apache.kafka.server.NodeToControllerChannelManager
 import org.apache.kafka.server.common.ProducerIdsBlock
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
diff --git 
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index ccf3909adcc..64b920cbf38 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.requests.{AbstractRequest, 
AlterPartitionRequest, AlterPartitionResponse}
 import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV2, 
IBP_3_2_IV0, IBP_3_5_IV1}
 import org.apache.kafka.server.util.{MockScheduler, MockTime}
diff --git 
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 89b81035dc3..a3c6deb488d 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -21,7 +21,6 @@ import java.net.InetAddress
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.{Collections, Optional, Properties}
-
 import kafka.controller.KafkaController
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.utils.TestUtils
@@ -38,6 +37,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
 import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
@@ -327,7 +327,7 @@ class AutoTopicCreationManagerTest {
       .setMinVersion(0)
       .setMaxVersion(0)
     Mockito.when(brokerToController.controllerApiVersions())
-      
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+      
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
 
     Mockito.when(controller.isActive).thenReturn(false)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index ae14fa7171b..6c6199f71da 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.extension.ExtendWith
@@ -47,7 +48,7 @@ import java.util.concurrent.{CompletableFuture, TimeUnit, 
TimeoutException}
 class BrokerRegistrationRequestTest {
 
   def brokerToControllerChannelManager(clusterInstance: ClusterInstance): 
NodeToControllerChannelManager = {
-    NodeToControllerChannelManager(
+    new NodeToControllerChannelManagerImpl(
       new ControllerNodeProvider() {
         def node: Option[Node] = Some(new Node(
           clusterInstance.anyControllerSocketServer().config.nodeId,
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 4ce400e1ed5..c242194ef76 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -17,12 +17,12 @@
 
 package kafka.server
 
-import kafka.metrics.ClientMetricsConfigs
 import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
BROKER_LOGGER, CLIENT_METRICS, TOPIC}
 import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG, 
SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
 import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException, InvalidTopicException}
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
 
@@ -153,4 +153,4 @@ class ControllerConfigurationValidatorTest {
       assertThrows(classOf[InvalidConfigurationException], () => 
validator.validate(
         new ConfigResource(CLIENT_METRICS, "subscription-1"), config)). 
getMessage())
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 41e67e61f5e..db200da8e33 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,7 +28,6 @@ import kafka.cluster.{Broker, Partition}
 import kafka.controller.{ControllerContext, KafkaController}
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
 import kafka.log.UnifiedLog
-import kafka.metrics.ClientMetricsTestUtils
 import kafka.network.{RequestChannel, RequestMetrics}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, 
MockConfigRepository, ZkMetadataCache}
@@ -98,6 +97,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteRes
 import org.apache.kafka.coordinator.group.GroupCoordinator
 import org.apache.kafka.server.common.{Features, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
IBP_2_2_IV1}
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.util.MockTime
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, 
FetchPartitionData, LogConfig}
 
diff --git 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 6c10ba89db7..a3b8643fb73 100644
--- 
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++ 
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -19,8 +19,11 @@ package kafka.server
 import org.apache.kafka.clients.{ClientResponse, MockClient, NodeApiVersions}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.server.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 import org.apache.kafka.server.util.MockTime
 
+import java.util.Optional
+
 class MockNodeToControllerChannelManager(
   val client: MockClient,
   time: MockTime,
@@ -48,8 +51,8 @@ class MockNodeToControllerChannelManager(
     ))
   }
 
-  override def controllerApiVersions(): Option[NodeApiVersions] = {
-    Some(controllerApiVersions)
+  override def controllerApiVersions(): Optional[NodeApiVersions] = {
+    Optional.of(controllerApiVersions)
   }
 
   private[server] def handleResponse(request: 
NodeToControllerQueueItem)(response: ClientResponse): Unit = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2e9ce677add..effb0ae67d6 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -71,6 +71,7 @@ import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.controller.QuorumController
 import org.apache.kafka.metadata.properties.MetaProperties
+import org.apache.kafka.server.ControllerRequestCompletionHandler
 import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, 
Authorizer => JAuthorizer}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/main/java/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
similarity index 99%
rename from core/src/main/java/kafka/server/AssignmentsManager.java
rename to server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index ade476bcab2..4d5e5206627 100644
--- a/core/src/main/java/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.server;
+package org.apache.kafka.server;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.common.Uuid;
@@ -249,7 +249,7 @@ public class AssignmentsManager {
     /**
      * Callback for a {@link AssignReplicasToDirsRequest}.
      */
-    private class AssignReplicasToDirsRequestCompletionHandler extends 
ControllerRequestCompletionHandler {
+    private class AssignReplicasToDirsRequestCompletionHandler implements 
ControllerRequestCompletionHandler {
         @Override
         public void onTimeout() {
             log.warn("Request to controller timed out");
diff --git a/core/src/main/java/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
similarity index 98%
rename from core/src/main/java/kafka/server/ClientMetricsManager.java
rename to server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index aecb33f7cb2..cd5a18be776 100644
--- a/core/src/main/java/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.server;
+package org.apache.kafka.server;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
 
b/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
new file mode 100644
index 00000000000..cad8bc723d9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.RequestCompletionHandler;
+
+public interface ControllerRequestCompletionHandler extends 
RequestCompletionHandler {
+
+    /**
+     * Fire when the request transmission time passes the caller defined 
deadline on the channel queue.
+     * It covers the total waiting time including retries which might be the 
result of individual request timeout.
+     */
+    void onTimeout();
+
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
new file mode 100644
index 00000000000..cb855fdad11
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.requests.AbstractRequest;
+
+import java.util.Optional;
+
+public interface NodeToControllerChannelManager {
+
+    void start();
+
+    void shutdown();
+
+    Optional<NodeApiVersions> controllerApiVersions();
+
+    void sendRequest(
+        AbstractRequest.Builder<? extends AbstractRequest> request,
+        ControllerRequestCompletionHandler callback
+    );
+}
diff --git a/core/src/main/java/kafka/metrics/ClientMetricsConfigs.java 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
similarity index 99%
rename from core/src/main/java/kafka/metrics/ClientMetricsConfigs.java
rename to 
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 838a20c3aae..737039fcbbf 100644
--- a/core/src/main/java/kafka/metrics/ClientMetricsConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.metrics;
+package org.apache.kafka.server.metrics;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
diff --git a/server/src/main/java/org/apache/kafka/server/package-info.java 
b/server/src/main/java/org/apache/kafka/server/package-info.java
new file mode 100644
index 00000000000..a7c6727a6f8
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Server-specific functionality for brokers and controllers.
+ */
+package org.apache.kafka.server;
diff --git a/core/src/test/java/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
similarity index 98%
rename from core/src/test/java/kafka/server/AssignmentsManagerTest.java
rename to 
server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 54de654960c..01f9b40878e 100644
--- a/core/src/test/java/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.server;
+package org.apache.kafka.server;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.common.Uuid;
@@ -31,6 +31,8 @@ import org.apache.kafka.server.common.TopicIdPartition;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
@@ -129,6 +131,7 @@ public class AssignmentsManagerTest {
     }
 
     @Test
+    @DisabledOnJre(JRE.JAVA_8)
     public void testAssignmentAggregation() throws InterruptedException {
         CountDownLatch readyToAssert = new CountDownLatch(1);
         doAnswer(invocation -> {
@@ -231,4 +234,4 @@ public class AssignmentsManagerTest {
                     }}
         ), captor.getAllValues().get(4).build().data());
     }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
similarity index 97%
rename from core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java
rename to 
server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 3697ba7ba55..f36608d9500 100644
--- a/core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.metrics;
+package org.apache.kafka.server.metrics;
 
 import java.util.Arrays;
 import java.util.Collections;
diff --git a/settings.gradle b/settings.gradle
index e11551ed94a..d3a1cba454b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -65,6 +65,7 @@ include 'clients',
     'log4j-appender',
     'metadata',
     'raft',
+    'server',
     'server-common',
     'shell',
     'storage',

Reply via email to