This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new df78204e05b KAFKA-15854: Move Java classes from `kafka.server` to the
`server` module (#14796)
df78204e05b is described below
commit df78204e05bb1c416d977a0a36f4c263251ae4ef
Author: Ismael Juma <[email protected]>
AuthorDate: Sun Nov 19 22:09:19 2023 -0800
KAFKA-15854: Move Java classes from `kafka.server` to the `server` module
(#14796)
We only move Java classes that have minimal or no dependencies on Scala
classes in this PR.
Details:
* Configured `server` module in build files.
* Changed `ControllerRequestCompletionHandler` to be an interface since it
has no implementations.
* Cleaned up various import control files.
* Minor build clean-ups for `server-common`.
* Disabled `testAssignmentAggregation` when executed with Java 8, this is
an existing issue (see #14794).
For broader context on this change, please check:
* KAFKA-15852: Move server code from `core` to `server` module
Reviewers: Divij Vaidya <[email protected]>
---
build.gradle | 76 ++++++++++++++++++----
checkstyle/import-control-metadata.xml | 5 --
checkstyle/import-control-server-common.xml | 4 --
checkstyle/import-control-server.xml | 61 +++++++++++++++++
checkstyle/import-control-storage.xml | 4 --
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../transaction/ProducerIdManager.scala | 2 +-
.../scala/kafka/server/AlterPartitionManager.scala | 3 +-
.../kafka/server/AutoTopicCreationManager.scala | 5 +-
.../kafka/server/BrokerLifecycleManager.scala | 1 +
.../src/main/scala/kafka/server/BrokerServer.scala | 7 +-
.../main/scala/kafka/server/ConfigHandler.scala | 1 +
.../server/ControllerConfigurationValidator.scala | 3 +-
.../server/ControllerRegistrationManager.scala | 1 +
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
.../scala/kafka/server/ForwardingManager.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
core/src/main/scala/kafka/server/KafkaBroker.scala | 1 +
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
.../server/NodeToControllerChannelManager.scala | 47 ++-----------
.../kafka/zk/ZkMigrationIntegrationTest.scala | 3 +-
.../server/NodeToControllerRequestThreadTest.scala | 1 +
.../scala/unit/kafka/cluster/PartitionTest.scala | 1 +
.../transaction/ProducerIdManagerTest.scala | 2 +-
.../kafka/server/AlterPartitionManagerTest.scala | 1 +
.../server/AutoTopicCreationManagerTest.scala | 4 +-
.../server/BrokerRegistrationRequestTest.scala | 3 +-
.../ControllerConfigurationValidatorTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../MockNodeToControllerChannelManager.scala | 7 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 1 +
.../apache}/kafka/server/AssignmentsManager.java | 4 +-
.../apache}/kafka/server/ClientMetricsManager.java | 2 +-
.../server/ControllerRequestCompletionHandler.java | 30 +++++++++
.../server/NodeToControllerChannelManager.java | 37 +++++++++++
.../server}/metrics/ClientMetricsConfigs.java | 2 +-
.../java/org/apache/kafka/server/package-info.java | 20 ++++++
.../kafka/server/AssignmentsManagerTest.java | 7 +-
.../server}/metrics/ClientMetricsTestUtils.java | 2 +-
settings.gradle | 1 +
40 files changed, 268 insertions(+), 102 deletions(-)
diff --git a/build.gradle b/build.gradle
index 3b7878a3e89..608a57d56ad 100644
--- a/build.gradle
+++ b/build.gradle
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn:
connectPkgs.collect { it + ":jar" })
tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it +
":test" }) {}
+project(':server') {
+ archivesBaseName = "kafka-server"
+
+ dependencies {
+ implementation project(':clients')
+ implementation project(':server-common')
+
+ implementation libs.slf4jApi
+
+ compileOnly libs.log4j
+
+ testImplementation project(':clients').sourceSets.test.output
+
+ testImplementation libs.mockitoCore
+ testImplementation libs.junitJupiter
+ testImplementation libs.slf4jlog4j
+ }
+
+ task createVersionFile() {
+ def receiptFile = file("$buildDir/kafka/$buildVersionFileName")
+ inputs.property "commitId", commitId
+ inputs.property "version", version
+ outputs.file receiptFile
+
+ doLast {
+ def data = [
+ commitId: commitId,
+ version: version,
+ ]
+
+ receiptFile.parentFile.mkdirs()
+ def content = data.entrySet().collect { "$it.key=$it.value"
}.sort().join("\n")
+ receiptFile.setText(content, "ISO-8859-1")
+ }
+ }
+
+ jar {
+ dependsOn createVersionFile
+ from("$buildDir") {
+ include "kafka/$buildVersionFileName"
+ }
+ }
+
+ clean.doFirst {
+ delete "$buildDir/kafka/"
+ }
+
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-server.xml")
+ }
+
+ javadoc {
+ enabled = false
+ }
+}
+
project(':core') {
apply plugin: 'scala'
@@ -873,7 +929,7 @@ project(':core') {
implementation project(':tools:tools-api')
implementation project(':raft')
implementation project(':storage')
-
+ implementation project(':server')
implementation libs.argparse4j
implementation libs.commonsValidator
@@ -912,6 +968,7 @@ project(':core') {
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
+ testImplementation project(':server').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation(libs.apacheda) {
@@ -1643,19 +1700,6 @@ project(':server-common') {
}
}
- sourceSets {
- main {
- java {
- srcDirs = ["src/main/java"]
- }
- }
- test {
- java {
- srcDirs = ["src/test/java"]
- }
- }
- }
-
jar {
dependsOn createVersionFile
from("$buildDir") {
@@ -1670,6 +1714,10 @@ project(':server-common') {
checkstyle {
configProperties =
checkstyleConfigProperties("import-control-server-common.xml")
}
+
+ javadoc {
+ enabled = false
+ }
}
project(':storage:storage-api') {
diff --git a/checkstyle/import-control-metadata.xml
b/checkstyle/import-control-metadata.xml
index d4643c19979..897932492b1 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -27,16 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
- <allow pkg="org.opentest4j" />
- <allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
- <allow pkg="org.easymock" />
- <allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="javax.crypto" />
- <allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index 79873703d84..622e7c5e052 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -27,15 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
- <allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
- <allow pkg="org.easymock" />
- <allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
- <allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
new file mode 100644
index 00000000000..499f8327ab6
--- /dev/null
+++ b/checkstyle/import-control-server.xml
@@ -0,0 +1,61 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka.server">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE
-->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+ <allow pkg="org.mockito" />
+ <allow pkg="java.security" />
+ <allow pkg="javax.net.ssl" />
+ <allow pkg="javax.security" />
+ <allow pkg="net.jqwik.api" />
+
+ <!-- no one depends on the server -->
+ <disallow pkg="kafka" />
+
+ <!-- anyone can use public classes -->
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.security" />
+ <allow pkg="org.apache.kafka.common.serialization" />
+ <allow pkg="org.apache.kafka.common.utils" />
+ <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.memory" />
+
+ <!-- protocol, records and request/response utilities -->
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.requests" />
+
+ <!-- utilities and reusable classes from server-common -->
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.server.common" />
+
+ <!-- persistent collection factories/non-library-specific wrappers -->
+ <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+</import-control>
diff --git a/checkstyle/import-control-storage.xml
b/checkstyle/import-control-storage.xml
index 42d170ec315..d8c5e287d31 100644
--- a/checkstyle/import-control-storage.xml
+++ b/checkstyle/import-control-storage.xml
@@ -27,15 +27,11 @@
<allow pkg="javax.management" />
<allow pkg="org.slf4j" />
<allow pkg="org.junit" />
- <allow pkg="org.opentest4j" />
<allow pkg="org.hamcrest" />
<allow pkg="org.mockito" />
- <allow pkg="org.easymock" />
- <allow pkg="org.powermock" />
<allow pkg="java.security" />
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
- <allow pkg="org.ietf.jgss" />
<allow pkg="net.jqwik.api" />
<!-- no one depends on the server -->
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 09ae8e6aa15..9182f9c4bcd 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -22,7 +22,6 @@ import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerTopicStats;
-import kafka.server.ClientMetricsManager;
import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager;
import kafka.server.KafkaApis;
@@ -35,6 +34,7 @@ import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer;
import java.util.Collections;
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 9e7b88d2ed7..06e358d7b6c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.{IterationLimit,
NoRetry, RetryBackoffMs}
-import kafka.server.{NodeToControllerChannelManager,
ControllerRequestCompletionHandler}
import kafka.utils.Logging
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.clients.ClientResponse
@@ -26,6 +25,7 @@ import
org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AllocateProducerIdsRequest,
AllocateProducerIdsResponse}
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.ProducerIdsBlock
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 1c5021b69bf..661f8afe5f2 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AlterPartitionRequest,
AlterPartitionResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.util.Scheduler
@@ -84,7 +85,7 @@ object AlterPartitionManager {
threadNamePrefix: String,
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
- val channelManager = NodeToControllerChannelManager(
+ val channelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time = time,
metrics = metrics,
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index 25e934d888c..a9b06dcbc51 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
-
import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
@@ -34,8 +33,10 @@ import
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopi
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest,
RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import scala.collection.{Map, Seq, Set, mutable}
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait AutoTopicCreationManager {
@@ -193,7 +194,7 @@ class DefaultAutoTopicCreationManager(
val request = metadataRequestContext.map { context =>
val requestVersion =
- channelManager.controllerApiVersions() match {
+ channelManager.controllerApiVersions.asScala match {
case None =>
// We will rely on the Metadata request to be retried in the case
// that the latest version is not usable by the controller.
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index bfe5ca0fde7..0b74ad0a3e4 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.common.utils.{ExponentialBackoff, LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import java.util.{Comparator, OptionalLong}
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 045429c8436..56725fa67d0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,6 +42,7 @@ import org.apache.kafka.coordinator.group.{GroupCoordinator,
GroupCoordinatorCon
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager,
NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, TopicIdPartition}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -222,7 +223,7 @@ class BrokerServer(
val controllerNodes =
RaftConfig.voterConnectionsToNodes(voterConnections).asScala
val controllerNodeProvider = RaftControllerNodeProvider(raftManager,
config, controllerNodes)
- clientToControllerChannelManager = NodeToControllerChannelManager(
+ clientToControllerChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
@@ -279,7 +280,7 @@ class BrokerServer(
time
)
- val assignmentsChannelManager = NodeToControllerChannelManager(
+ val assignmentsChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
@@ -357,7 +358,7 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava
- val brokerLifecycleChannelManager = NodeToControllerChannelManager(
+ val brokerLifecycleChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index a770e17f0c0..5a2c11cb91c 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
+import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.storage.internals.log.{LogConfig,
ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
diff --git
a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
index 73a609ee7eb..28ddac0c5e6 100644
--- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
+++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala
@@ -17,8 +17,6 @@
package kafka.server
-import kafka.metrics.ClientMetricsConfigs
-
import java.util
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
@@ -26,6 +24,7 @@ import
org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRIC
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.mutable
diff --git
a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
index 6cf42690384..20edae23ca8 100644
--- a/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
+++ b/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.queue.EventQueue.DeadlineFunction
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7b28c3372e1..1f08cdffd05 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver,
LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.{ClientMetricsManager,
NodeToControllerChannelManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -432,7 +433,7 @@ class ControllerServer(
* Start the KIP-919 controller registration manager.
*/
val controllerNodeProvider = RaftControllerNodeProvider(raftManager,
config, controllerNodes.asScala)
- registrationChannelManager = NodeToControllerChannelManager(
+ registrationChannelManager = new NodeToControllerChannelManagerImpl(
controllerNodeProvider,
time,
metrics,
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index b0b13dfecda..7d7b6eba02d 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -18,13 +18,13 @@
package kafka.server
import java.nio.ByteBuffer
-
import kafka.network.RequestChannel
import kafka.utils.Logging
import org.apache.kafka.clients.{ClientResponse, NodeApiVersions}
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import scala.compat.java8.OptionConverters._
@@ -165,7 +165,7 @@ class ForwardingManagerImpl(
}
override def controllerApiVersions: Option[NodeApiVersions] =
- channelManager.controllerApiVersions()
+ channelManager.controllerApiVersions.asScala
private def parseResponse(
buffer: ByteBuffer,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3f5a435d147..3a819a3d184 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -67,6 +67,7 @@ import
org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0,
IBP_2_3_IV0}
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala
b/core/src/main/scala/kafka/server/KafkaBroker.scala
index ea1d6cf8ed0..e281087f12f 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -32,6 +32,7 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.metadata.BrokerState
+import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.util.Scheduler
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 6f59820a6f6..66e2cf42713 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -52,6 +52,7 @@ import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationF
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble}
import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde,
VersionRange}
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -338,7 +339,7 @@ class KafkaServer(
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new
CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
- clientToControllerChannelManager = NodeToControllerChannelManager(
+ clientToControllerChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
@@ -428,7 +429,7 @@ class KafkaServer(
)
val controllerNodes =
RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
val quorumControllerNodeProvider =
RaftControllerNodeProvider(raftManager, config, controllerNodes)
- val brokerToQuorumChannelManager = NodeToControllerChannelManager(
+ val brokerToQuorumChannelManager = new
NodeToControllerChannelManagerImpl(
controllerNodeProvider = quorumControllerNodeProvider,
time = time,
metrics = metrics,
diff --git
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index ee69a74aede..19d19c87bb1 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -31,10 +31,12 @@ import org.apache.kafka.common.requests.AbstractRequest
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.util.{InterBrokerSendThread,
RequestAndCompletionHandler}
import java.util
+import java.util.Optional
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@@ -130,38 +132,6 @@ class RaftControllerNodeProvider(
listenerName, securityProtocol, saslMechanism, isZkController = false)
}
-object NodeToControllerChannelManager {
- def apply(
- controllerNodeProvider: ControllerNodeProvider,
- time: Time,
- metrics: Metrics,
- config: KafkaConfig,
- channelName: String,
- threadNamePrefix: String,
- retryTimeoutMs: Long
- ): NodeToControllerChannelManager = {
- new NodeToControllerChannelManagerImpl(
- controllerNodeProvider,
- time,
- metrics,
- config,
- channelName,
- threadNamePrefix,
- retryTimeoutMs
- )
- }
-}
-
-trait NodeToControllerChannelManager {
- def start(): Unit
- def shutdown(): Unit
- def controllerApiVersions(): Option[NodeApiVersions]
- def sendRequest(
- request: AbstractRequest.Builder[_ <: AbstractRequest],
- callback: ControllerRequestCompletionHandler
- ): Unit
-}
-
/**
* This class manages the connection between a broker and the controller. It
runs a single
* [[NodeToControllerRequestThread]] which uses the broker's metadata cache as
its own metadata to find
@@ -270,22 +240,13 @@ class NodeToControllerChannelManagerImpl(
))
}
- def controllerApiVersions(): Option[NodeApiVersions] = {
+ def controllerApiVersions(): Optional[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
Option(apiVersions.get(activeController.idString))
- }
+ }.asJava
}
}
-abstract class ControllerRequestCompletionHandler extends
RequestCompletionHandler {
-
- /**
- * Fire when the request transmission time passes the caller defined
deadline on the channel queue.
- * It covers the total waiting time including retries which might be the
result of individual request timeout.
- */
- def onTimeout(): Unit
-}
-
case class NodeToControllerQueueItem(
createdTimeMs: Long,
request: AbstractRequest.Builder[_ <: AbstractRequest],
diff --git
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 65e6337f07d..753f69d20be 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,7 +17,7 @@
package kafka.zk
import kafka.security.authorizer.AclEntry.{WildcardHost,
WildcardPrincipalString}
-import kafka.server.{ConfigType, ControllerRequestCompletionHandler,
KafkaConfig}
+import kafka.server.{ConfigType, KafkaConfig}
import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
import kafka.test.annotation.{AutoStart, ClusterConfigProperty,
ClusterTemplate, ClusterTest, Type}
import kafka.test.junit.ClusterTestExtensions
@@ -45,6 +45,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion,
ProducerIdsBlock}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
diff --git
a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
index 01432bf6f47..5b98fe4d9c8 100644
--- a/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests.{AbstractRequest,
EnvelopeRequest, Envel
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index afa9b8b6e0b..a3454f4d6fe 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0
import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 5dd665deae9..8b5d44a2a36 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -17,7 +17,6 @@
package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
-import kafka.server.NodeToControllerChannelManager
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
import org.apache.kafka.common.KafkaException
@@ -26,6 +25,7 @@ import
org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
import org.apache.kafka.common.utils.{MockTime, Time}
+import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
diff --git
a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index ccf3909adcc..64b920cbf38 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AbstractRequest,
AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.metadata.LeaderRecoveryState
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV2,
IBP_3_2_IV0, IBP_3_5_IV1}
import org.apache.kafka.server.util.{MockScheduler, MockTime}
diff --git
a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index 89b81035dc3..a3c6deb488d 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -21,7 +21,6 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Collections, Optional, Properties}
-
import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.TestUtils
@@ -38,6 +37,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.coordinator.group.GroupCoordinator
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
@@ -327,7 +327,7 @@ class AutoTopicCreationManagerTest {
.setMinVersion(0)
.setMaxVersion(0)
Mockito.when(brokerToController.controllerApiVersions())
-
.thenReturn(Some(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
+
.thenReturn(Optional.of(NodeApiVersions.create(Collections.singleton(createTopicApiVersion))))
Mockito.when(controller.isActive).thenReturn(false)
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index ae14fa7171b..6c6199f71da 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.extension.ExtendWith
@@ -47,7 +48,7 @@ import java.util.concurrent.{CompletableFuture, TimeUnit,
TimeoutException}
class BrokerRegistrationRequestTest {
def brokerToControllerChannelManager(clusterInstance: ClusterInstance):
NodeToControllerChannelManager = {
- NodeToControllerChannelManager(
+ new NodeToControllerChannelManagerImpl(
new ControllerNodeProvider() {
def node: Option[Node] = Some(new Node(
clusterInstance.anyControllerSocketServer().config.nodeId,
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
index 4ce400e1ed5..c242194ef76 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala
@@ -17,12 +17,12 @@
package kafka.server
-import kafka.metrics.ClientMetricsConfigs
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.common.config.TopicConfig.{SEGMENT_BYTES_CONFIG,
SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException, InvalidTopicException}
+import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
@@ -153,4 +153,4 @@ class ControllerConfigurationValidatorTest {
assertThrows(classOf[InvalidConfigurationException], () =>
validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config)).
getMessage())
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 41e67e61f5e..db200da8e33 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -28,7 +28,6 @@ import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
import kafka.log.UnifiedLog
-import kafka.metrics.ClientMetricsTestUtils
import kafka.network.{RequestChannel, RequestMetrics}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache,
MockConfigRepository, ZkMetadataCache}
@@ -98,6 +97,7 @@ import
org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteRes
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.common.{Features, MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0,
IBP_2_2_IV1}
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams,
FetchPartitionData, LogConfig}
diff --git
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
index 6c10ba89db7..a3b8643fb73 100644
---
a/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
+++
b/core/src/test/scala/unit/kafka/server/MockNodeToControllerChannelManager.scala
@@ -19,8 +19,11 @@ package kafka.server
import org.apache.kafka.clients.{ClientResponse, MockClient, NodeApiVersions}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.server.{ControllerRequestCompletionHandler,
NodeToControllerChannelManager}
import org.apache.kafka.server.util.MockTime
+import java.util.Optional
+
class MockNodeToControllerChannelManager(
val client: MockClient,
time: MockTime,
@@ -48,8 +51,8 @@ class MockNodeToControllerChannelManager(
))
}
- override def controllerApiVersions(): Option[NodeApiVersions] = {
- Some(controllerApiVersions)
+ override def controllerApiVersions(): Optional[NodeApiVersions] = {
+ Optional.of(controllerApiVersions)
}
private[server] def handleResponse(request:
NodeToControllerQueueItem)(response: ClientResponse): Unit = {
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 2e9ce677add..effb0ae67d6 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -71,6 +71,7 @@ import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.controller.QuorumController
import org.apache.kafka.metadata.properties.MetaProperties
+import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext,
Authorizer => JAuthorizer}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/main/java/kafka/server/AssignmentsManager.java
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
similarity index 99%
rename from core/src/main/java/kafka/server/AssignmentsManager.java
rename to server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index ade476bcab2..4d5e5206627 100644
--- a/core/src/main/java/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
@@ -249,7 +249,7 @@ public class AssignmentsManager {
/**
* Callback for a {@link AssignReplicasToDirsRequest}.
*/
- private class AssignReplicasToDirsRequestCompletionHandler extends
ControllerRequestCompletionHandler {
+ private class AssignReplicasToDirsRequestCompletionHandler implements
ControllerRequestCompletionHandler {
@Override
public void onTimeout() {
log.warn("Request to controller timed out");
diff --git a/core/src/main/java/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
similarity index 98%
rename from core/src/main/java/kafka/server/ClientMetricsManager.java
rename to server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index aecb33f7cb2..cd5a18be776 100644
--- a/core/src/main/java/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
b/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
new file mode 100644
index 00000000000..cad8bc723d9
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/ControllerRequestCompletionHandler.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.RequestCompletionHandler;
+
+public interface ControllerRequestCompletionHandler extends
RequestCompletionHandler {
+
+ /**
+ * Fire when the request transmission time passes the caller defined
deadline on the channel queue.
+ * It covers the total waiting time including retries which might be the
result of individual request timeout.
+ */
+ void onTimeout();
+
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
new file mode 100644
index 00000000000..cb855fdad11
--- /dev/null
+++
b/server/src/main/java/org/apache/kafka/server/NodeToControllerChannelManager.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.requests.AbstractRequest;
+
+import java.util.Optional;
+
+public interface NodeToControllerChannelManager {
+
+ void start();
+
+ void shutdown();
+
+ Optional<NodeApiVersions> controllerApiVersions();
+
+ void sendRequest(
+ AbstractRequest.Builder<? extends AbstractRequest> request,
+ ControllerRequestCompletionHandler callback
+ );
+}
diff --git a/core/src/main/java/kafka/metrics/ClientMetricsConfigs.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
similarity index 99%
rename from core/src/main/java/kafka/metrics/ClientMetricsConfigs.java
rename to
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 838a20c3aae..737039fcbbf 100644
--- a/core/src/main/java/kafka/metrics/ClientMetricsConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.metrics;
+package org.apache.kafka.server.metrics;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
diff --git a/server/src/main/java/org/apache/kafka/server/package-info.java
b/server/src/main/java/org/apache/kafka/server/package-info.java
new file mode 100644
index 00000000000..a7c6727a6f8
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Server-specific functionality for brokers and controllers.
+ */
+package org.apache.kafka.server;
diff --git a/core/src/test/java/kafka/server/AssignmentsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
similarity index 98%
rename from core/src/test/java/kafka/server/AssignmentsManagerTest.java
rename to
server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index 54de654960c..01f9b40878e 100644
--- a/core/src/test/java/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
@@ -31,6 +31,8 @@ import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnJre;
+import org.junit.jupiter.api.condition.JRE;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
@@ -129,6 +131,7 @@ public class AssignmentsManagerTest {
}
@Test
+ @DisabledOnJre(JRE.JAVA_8)
public void testAssignmentAggregation() throws InterruptedException {
CountDownLatch readyToAssert = new CountDownLatch(1);
doAnswer(invocation -> {
@@ -231,4 +234,4 @@ public class AssignmentsManagerTest {
}}
), captor.getAllValues().get(4).build().data());
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
similarity index 97%
rename from core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java
rename to
server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 3697ba7ba55..f36608d9500 100644
--- a/core/src/test/java/kafka/metrics/ClientMetricsTestUtils.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.metrics;
+package org.apache.kafka.server.metrics;
import java.util.Arrays;
import java.util.Collections;
diff --git a/settings.gradle b/settings.gradle
index e11551ed94a..d3a1cba454b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -65,6 +65,7 @@ include 'clients',
'log4j-appender',
'metadata',
'raft',
+ 'server',
'server-common',
'shell',
'storage',