This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ebb3202e01d KAFKA-16964 Integration tests for adding and removing 
voters (#17582)
ebb3202e01d is described below

commit ebb3202e01d9481d7e711cd25e4e564863b91e1e
Author: kevin-wu24 <[email protected]>
AuthorDate: Mon Nov 4 13:09:37 2024 -0600

    KAFKA-16964 Integration tests for adding and removing voters (#17582)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/network/SocketServer.scala    |  45 +++---
 .../src/main/scala/kafka/server/BrokerServer.scala |   8 +-
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   3 +-
 .../src/main/scala/kafka/server/SharedServer.scala |   5 +-
 .../kafka/server/metadata/KRaftMetadataCache.scala |   6 +-
 .../ReconfigurableQuorumIntegrationTest.java       | 161 +++++++++++++++++++
 .../kafka/server/KRaftClusterTest.scala            |   3 +-
 .../kafka/server/QuorumTestHarness.scala           |   8 +-
 .../unit/kafka/network/SocketServerTest.scala      |   2 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   4 +-
 .../metadata/bootstrap/BootstrapMetadata.java      |  42 +++--
 .../apache/kafka/metadata/storage/Formatter.java   |  24 ++-
 .../metadata/bootstrap/BootstrapMetadataTest.java  |  54 ++++++-
 .../apache/kafka/server/ServerSocketFactory.java   |  62 ++++++++
 .../kafka/common/test/KafkaClusterTestKit.java     | 109 +++++++++----
 .../common/test/PreboundSocketFactoryManager.java  | 170 +++++++++++++++++++++
 .../org/apache/kafka/common/test/TestKitNode.java  |   5 +
 .../org/apache/kafka/common/test/TestKitNodes.java |  19 ++-
 .../group/ConsumerGroupCommandTestUtils.java       |   4 +
 20 files changed, 642 insertions(+), 95 deletions(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 7dc1cecdfd9..f706a8dff95 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -46,6 +46,7 @@ import org.apache.kafka.common.utils.{KafkaThread, 
LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, 
Reconfigurable}
 import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
 import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.ServerSocketFactory
 import org.apache.kafka.server.config.QuotaConfig
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.ConnectionDisconnectListener
@@ -76,13 +77,15 @@ import scala.util.control.ControlThrowable
  *      Acceptor has 1 Processor thread that has its own selector and read 
requests from the socket.
  *      1 Handler thread that handles requests and produces responses back to 
the processor thread for writing.
  */
-class SocketServer(val config: KafkaConfig,
-                   val metrics: Metrics,
-                   val time: Time,
-                   val credentialProvider: CredentialProvider,
-                   val apiVersionManager: ApiVersionManager,
-                   val connectionDisconnectListeners: 
Seq[ConnectionDisconnectListener] = Seq.empty)
-  extends Logging with BrokerReconfigurable {
+class SocketServer(
+  val config: KafkaConfig,
+  val metrics: Metrics,
+  val time: Time,
+  val credentialProvider: CredentialProvider,
+  val apiVersionManager: ApiVersionManager,
+  val socketFactory: ServerSocketFactory = ServerSocketFactory.INSTANCE,
+  val connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = 
Seq.empty
+) extends Logging with BrokerReconfigurable {
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
@@ -721,23 +724,17 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
    * Create a server socket to listen for connections on.
    */
   private def openServerSocket(host: String, port: Int, listenBacklogSize: 
Int): ServerSocketChannel = {
-    val socketAddress =
-      if (Utils.isBlank(host))
-        new InetSocketAddress(port)
-      else
-        new InetSocketAddress(host, port)
-    val serverChannel = ServerSocketChannel.open()
-    try {
-      serverChannel.configureBlocking(false)
-      if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
-        serverChannel.socket().setReceiveBufferSize(recvBufferSize)
-      serverChannel.socket.bind(socketAddress, listenBacklogSize)
-      info(s"Awaiting socket connections on 
${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
-    } catch {
-      case e: SocketException =>
-        Utils.closeQuietly(serverChannel, "server socket")
-        throw new KafkaException(s"Socket server failed to bind to 
${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
-    }
+    val socketAddress = if (Utils.isBlank(host)) {
+      new InetSocketAddress(port)
+    } else {
+      new InetSocketAddress(host, port)
+    }
+    val serverChannel = socketServer.socketFactory.openServerSocket(
+      endPoint.listenerName.value(),
+      socketAddress,
+      listenBacklogSize,
+      recvBufferSize)
+    info(s"Awaiting socket connections on 
${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
     serverChannel
   }
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a40d3bbc243..4a215cc5078 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -265,7 +265,13 @@ class BrokerServer(
       // Create and start the socket server acceptor threads so that the bound 
port is known.
       // Delay starting processors until the end of the initialization 
sequence to ensure
       // that credentials have been loaded before processing authentications.
-      socketServer = new SocketServer(config, metrics, time, 
credentialProvider, apiVersionManager, connectionDisconnectListeners)
+      socketServer = new SocketServer(config,
+        metrics,
+        time,
+        credentialProvider,
+        apiVersionManager,
+        sharedServer.socketFactory,
+        connectionDisconnectListeners)
 
       clientQuotaMetadataManager = new 
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
 
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 8905b2c4f7f..90deff7ed86 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -161,7 +161,8 @@ class ControllerServer(
         metrics,
         time,
         credentialProvider,
-        apiVersionManager)
+        apiVersionManager,
+        sharedServer.socketFactory)
 
       val listenerInfo = ListenerInfo
         
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 7f56600f8b0..4a676a2765e 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -30,7 +30,7 @@ import 
org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
 import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
 REQUIRE_METADATA_LOG_DIR}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble}
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.storage.internals.log.LogConfig
@@ -73,6 +73,7 @@ class KafkaRaftServer(
     
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
     QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
     new StandardFaultHandlerFactory(),
+    ServerSocketFactory.INSTANCE,
   )
 
   private val broker: Option[BrokerServer] = if 
(config.processRoles.contains(ProcessRole.BrokerRole)) {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index 658097d3707..a465c385077 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.metadata.ListenerInfo
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
 import org.apache.kafka.raft.Endpoints
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, 
ProcessTerminatingFaultHandler}
 import org.apache.kafka.server.metrics.{BrokerServerMetrics, 
KafkaYammerMetrics}
@@ -98,7 +98,8 @@ class SharedServer(
   private val _metrics: Metrics,
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, 
InetSocketAddress]],
   val bootstrapServers: JCollection[InetSocketAddress],
-  val faultHandlerFactory: FaultHandlerFactory
+  val faultHandlerFactory: FaultHandlerFactory,
+  val socketFactory: ServerSocketFactory
 ) extends Logging {
   private val logContext: LogContext = new LogContext(s"[SharedServer 
id=${sharedServerConfig.nodeId}] ")
   this.logIdent = logContext.logPrefix
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index e6a874c45d1..5fad48f8a71 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -550,8 +550,10 @@ class KRaftMetadataCache(
   override def features(): FinalizedFeatures = {
     val image = _currentImage
     val finalizedFeatures = new java.util.HashMap[String, 
java.lang.Short](image.features().finalizedVersions())
-    finalizedFeatures.put(KRaftVersion.FEATURE_NAME, 
kraftVersionSupplier.get().featureLevel())
-
+    val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
+    if (kraftVersionLevel > 0) {
+      finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
+    }
     new FinalizedFeatures(image.features().metadataVersion(),
       finalizedFeatures,
       image.highestOffsetAndEpoch().offset,
diff --git 
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java 
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 00000000000..6d130015058
--- /dev/null
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.QuorumInfo;
+import org.apache.kafka.clients.admin.RaftVoterEndpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.test.KafkaClusterTestKit;
+import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReconfigurableQuorumIntegrationTest {
+    static void checkKRaftVersions(Admin admin, short finalized) throws 
Exception {
+        FeatureMetadata featureMetadata = 
admin.describeFeatures().featureMetadata().get();
+        if (finalized > 0) {
+            
assertTrue(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+            assertEquals(finalized, featureMetadata.finalizedFeatures().
+                    get(KRaftVersion.FEATURE_NAME).minVersionLevel());
+            assertEquals(finalized, featureMetadata.finalizedFeatures().
+                    get(KRaftVersion.FEATURE_NAME).maxVersionLevel());
+        } else {
+            
assertFalse(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+        }
+        assertEquals((short) 0, featureMetadata.supportedFeatures().
+                get(KRaftVersion.FEATURE_NAME).minVersion());
+        assertEquals((short) 1, featureMetadata.supportedFeatures().
+                get(KRaftVersion.FEATURE_NAME).maxVersion());
+    }
+
+    @Test
+    public void testCreateAndDestroyNonReconfigurableCluster() throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    checkKRaftVersions(admin, (short) 0);
+                });
+            }
+        }
+    }
+
+    @Test
+    public void testCreateAndDestroyReconfigurableCluster() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(1).
+                setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    checkKRaftVersions(admin, (short) 1);
+                });
+            }
+        }
+    }
+
+    static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
+        QuorumInfo quorumInfo = 
admin.describeMetadataQuorum().quorumInfo().get();
+        Map<Integer, Uuid> result = new TreeMap<>();
+        quorumInfo.voters().forEach(v -> {
+            result.put(v.replicaId(), v.replicaDirectoryId());
+        });
+        return result;
+    }
+
+    @Test
+    public void testRemoveController() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+                    build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(new HashSet<>(Arrays.asList(3000, 3001, 
3002)), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+                    }
+                });
+                admin.removeRaftVoter(3000, cluster.nodes().
+                    
controllerNodes().get(3000).metadataDirectoryId()).all().get();
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveAndAddSameController() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(4).
+                setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+                build()).build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(new HashSet<>(Arrays.asList(3000, 3001, 3002, 
3003)), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002, 3003}) {
+                        assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+                    }
+                });
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+                admin.removeRaftVoter(3000, dirId).all().get();
+                admin.addRaftVoter(
+                    3000,
+                    dirId,
+                    Collections.singleton(new RaftVoterEndpoint("CONTROLLER", 
"example.com", 8080))
+                ).all().get();
+            }
+        }
+    }
+}
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index cbc502801cc..21ae6c379bd 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1389,8 +1389,7 @@ class KRaftClusterTest {
         setName("num.io.threads").
         setValue("9"), 0.toShort))
     val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords, 
"testRecords")).
+      new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, 
"testRecords")).
         setNumBrokerNodes(1).
         setNumControllerNodes(1).build()).
       build()
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 154589a7853..985b6be0452 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.metadata.storage.Formatter
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.queue.KafkaEventQueue
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
 import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -137,7 +137,8 @@ class KRaftQuorumImplementation(
       new Metrics(),
       controllerQuorumVotersFuture,
       controllerQuorumVotersFuture.get().values(),
-      faultHandlerFactory
+      faultHandlerFactory,
+      ServerSocketFactory.INSTANCE,
     )
     var broker: BrokerServer = null
     try {
@@ -387,7 +388,8 @@ abstract class QuorumTestHarness extends Logging {
       new Metrics(),
       controllerQuorumVotersFuture,
       Collections.emptyList(),
-      faultHandlerFactory
+      faultHandlerFactory,
+      ServerSocketFactory.INSTANCE,
     )
     var controllerServer: ControllerServer = null
     try {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index deb77a79de9..ca4156eacd2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -2248,7 +2248,7 @@ class SocketServerTest {
     time: Time = Time.SYSTEM,
     connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = 
Seq.empty
   ) extends SocketServer(
-    config, new Metrics, time, credentialProvider, apiVersionManager, 
connectionDisconnectListeners
+    config, new Metrics, time, credentialProvider, apiVersionManager, 
connectionDisconnectListeners = connectionDisconnectListeners
   ) {
 
     override def createDataPlaneAcceptor(endPoint: EndPoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel) : 
DataPlaneAcceptor = {
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 6da1d6c2f11..eea2d7cc46b 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -71,8 +71,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
 
   // Use the latest production MV for this test
   @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
-      new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-      new ClusterConfigProperty(key = "unstable.feature.versions.enable", 
value = "false"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "unstable.feature.versions.enable", value 
= "false"),
   ))
   def testApiVersionsRequestValidationV0(): Unit = {
     val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 1de6a1f0ee5..2dc6d9a6eaf 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -134,19 +134,43 @@ public class BootstrapMetadata {
         return source;
     }
 
-    public BootstrapMetadata copyWithOnlyVersion() {
-        ApiMessageAndVersion versionRecord = null;
+    public short featureLevel(String featureName) {
+        short result = 0;
         for (ApiMessageAndVersion record : records) {
-            if (recordToMetadataVersion(record.message()).isPresent()) {
-                versionRecord = record;
+            if (record.message() instanceof FeatureLevelRecord) {
+                FeatureLevelRecord message = (FeatureLevelRecord) 
record.message();
+                if (message.name().equals(featureName)) {
+                    result = message.featureLevel();
+                }
             }
         }
-        if (versionRecord == null) {
-            throw new RuntimeException("No FeatureLevelRecord for " + 
MetadataVersion.FEATURE_NAME +
-                    " was found in " + source);
+        return result;
+    }
+
+    public BootstrapMetadata copyWithFeatureRecord(String featureName, short 
level) {
+        List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+        int i = 0;
+        while (i < records.size()) {
+            if (records.get(i).message() instanceof FeatureLevelRecord) {
+                FeatureLevelRecord record = (FeatureLevelRecord) 
records.get(i).message();
+                if (record.name().equals(featureName)) {
+                    FeatureLevelRecord newRecord = record.duplicate();
+                    newRecord.setFeatureLevel(level);
+                    newRecords.add(new ApiMessageAndVersion(newRecord, (short) 
0));
+                    break;
+                } else {
+                    newRecords.add(records.get(i));
+                }
+            }
+            i++;
+        }
+        if (i == records.size()) {
+            FeatureLevelRecord newRecord = new FeatureLevelRecord().
+                setName(featureName).
+                setFeatureLevel(level);
+            newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
         }
-        return new BootstrapMetadata(Collections.singletonList(versionRecord),
-                metadataVersion, source);
+        return BootstrapMetadata.fromRecords(newRecords, source);
     }
 
     @Override
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 6963707d850..53013307149 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -124,7 +124,7 @@ public class Formatter {
     /**
      * The metadata log directory.
      */
-    private String metadataLogDirectory = null;
+    private Optional<String> metadataLogDirectory = Optional.empty();
 
     /**
      * The initial KIP-853 voters.
@@ -166,6 +166,10 @@ public class Formatter {
         return this;
     }
 
+    public Collection<String> directories() {
+        return directories;
+    }
+
     public Formatter setReleaseVersion(MetadataVersion releaseVersion) {
         this.releaseVersion = releaseVersion;
         return this;
@@ -197,6 +201,11 @@ public class Formatter {
     }
 
     public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
+        this.metadataLogDirectory = Optional.of(metadataLogDirectory);
+        return this;
+    }
+
+    public Formatter setMetadataLogDirectory(Optional<String> 
metadataLogDirectory) {
         this.metadataLogDirectory = metadataLogDirectory;
         return this;
     }
@@ -231,13 +240,12 @@ public class Formatter {
         if (controllerListenerName == null) {
             throw new FormatterException("You must specify the name of the 
initial controller listener.");
         }
-        if (metadataLogDirectory == null) {
-            throw new FormatterException("You must specify the metadata log 
directory.");
-        }
-        if (!directories.contains(metadataLogDirectory)) {
-            throw new FormatterException("The specified metadata log 
directory, " + metadataLogDirectory +
+        metadataLogDirectory.ifPresent(d -> {
+            if (!directories.contains(d)) {
+                throw new FormatterException("The specified metadata log 
directory, " + d +
                     " was not one of the given directories: " + directories);
-        }
+            }
+        });
         releaseVersion = calculateEffectiveReleaseVersion();
         featureLevels = calculateEffectiveFeatureLevels();
         this.bootstrapMetadata = calculateBootstrapMetadata();
@@ -400,7 +408,7 @@ public class Formatter {
             Map<String, DirectoryType> directoryTypes = new HashMap<>();
             for (String emptyLogDir : ensemble.emptyLogDirs()) {
                 DirectoryType directoryType = 
DirectoryType.calculate(emptyLogDir,
-                    metadataLogDirectory,
+                    metadataLogDirectory.orElse(""),
                     nodeId,
                     initialControllers);
                 directoryTypes.put(emptyLogDir, directoryType);
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 51189412f41..fd41fefabf0 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -71,10 +72,57 @@ public class BootstrapMetadataTest {
                 () -> BootstrapMetadata.fromRecords(emptyList(), 
"quux")).getMessage());
     }
 
+    private static final ApiMessageAndVersion MV_10 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 10), (short) 0);
+
+    private static final ApiMessageAndVersion MV_11 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName(FEATURE_NAME).
+            setFeatureLevel((short) 11), (short) 0);
+
+    private static final ApiMessageAndVersion FOO_1 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("foo").
+            setFeatureLevel((short) 1), (short) 0);
+
+    private static final ApiMessageAndVersion FOO_2 =
+        new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("foo").
+            setFeatureLevel((short) 2), (short) 0);
+
+    @Test
+    public void testCopyWithNewFeatureRecord() {
+        assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, 
FOO_1), "src"),
+            BootstrapMetadata.fromRecords(Arrays.asList(MV_10), "src").
+                copyWithFeatureRecord("foo", (short) 1));
+    }
+
+    @Test
+    public void testFeatureLevelForMetadataVersion() {
+        assertEquals((short) 11, BootstrapMetadata.
+            fromRecords(Arrays.asList(MV_10, MV_11), "src").
+                featureLevel(FEATURE_NAME));
+    }
+
+    @Test
+    public void testCopyWithModifiedFeatureRecord() {
+        assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10, 
FOO_2), "src"),
+            BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src").
+                copyWithFeatureRecord("foo", (short) 2));
+    }
+
+    @Test
+    public void testFeatureLevelForFeatureThatIsNotSet() {
+        assertEquals((short) 0, BootstrapMetadata.
+            fromRecords(Arrays.asList(MV_10), "src").featureLevel("foo"));
+    }
+
     @Test
-    public void testCopyWithOnlyVersion() {
-        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), 
IBP_3_3_IV2, "baz"),
-                BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, 
"baz").copyWithOnlyVersion());
+    public void testFeatureLevelForFeature() {
+        assertEquals((short) 2, BootstrapMetadata.
+            fromRecords(Arrays.asList(MV_10, FOO_2), 
"src").featureLevel("foo"));
     }
 
     static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION 
= Collections.singletonList(
diff --git 
a/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java 
b/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java
new file mode 100644
index 00000000000..7739507ade8
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerSocketFactory {
+    ServerSocketFactory INSTANCE = new KafkaServerSocketFactory();
+    ServerSocketChannel openServerSocket(
+        String listenerName,
+        InetSocketAddress socketAddress,
+        int listenBacklogSize,
+        int recvBufferSize
+    ) throws IOException;
+
+    class KafkaServerSocketFactory implements ServerSocketFactory {
+
+        @Override
+        public ServerSocketChannel openServerSocket(
+                String listenerName,
+                InetSocketAddress socketAddress,
+                int listenBacklogSize,
+                int recvBufferSize
+        ) throws IOException {
+            ServerSocketChannel socketChannel = ServerSocketChannel.open();
+            try {
+                socketChannel.configureBlocking(false);
+                if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
+                    
socketChannel.socket().setReceiveBufferSize(recvBufferSize);
+                }
+                socketChannel.socket().bind(socketAddress, listenBacklogSize);
+            } catch (SocketException e) {
+                Utils.closeQuietly(socketChannel, "server socket");
+                throw new KafkaException(String.format("Socket server failed 
to bind to %s:%d: %s.",
+                    socketAddress.getHostString(), socketAddress.getPort(), 
e.getMessage()), e);
+            }
+            return socketChannel;
+        }
+    }
+}
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 4ef6ce41550..db857e5bcc8 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -34,12 +34,13 @@ import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.Controller;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
-import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.storage.Formatter;
 import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.config.KRaftConfigs;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.fault.FaultHandler;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -147,11 +149,13 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
         private final TestKitNodes nodes;
         private final Map<String, Object> configProps = new HashMap<>();
         private final SimpleFaultHandlerFactory faultHandlerFactory = new 
SimpleFaultHandlerFactory();
+        private final PreboundSocketFactoryManager socketFactoryManager = new 
PreboundSocketFactoryManager();
         private final String brokerListenerName;
         private final String controllerListenerName;
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
 
+
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
             this.brokerListenerName = nodes.brokerListenerName().value();
@@ -165,7 +169,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
-        private KafkaConfig createNodeConfig(TestKitNode node) {
+        private KafkaConfig createNodeConfig(TestKitNode node) throws 
IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
 
@@ -201,13 +205,18 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, 
brokerListenerName);
             props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
controllerListenerName);
 
-            // Note: we can't accurately set controller.quorum.voters yet, 
since we don't
-            // yet know what ports each controller will pick.  Set it to a 
dummy string
-            // for now as a placeholder.
-            String uninitializedQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
-                    map(n -> String.format("%[email protected]:0", n)).
-                    collect(Collectors.joining(","));
-            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
uninitializedQuorumVotersString);
+            StringBuilder quorumVoterStringBuilder = new StringBuilder();
+            String prefix = "";
+            for (int nodeId : nodes.controllerNodes().keySet()) {
+                quorumVoterStringBuilder.append(prefix).
+                    append(nodeId).
+                    append("@").
+                    append("localhost").
+                    append(":").
+                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+                prefix = ",";
+            }
+            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
 
             // reduce log cleaner offset map memory usage
             props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 
"2097152");
@@ -235,6 +244,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
 
             try {
                 baseDirectory = new File(nodes.baseDirectory());
+                for (TestKitNode node : nodes.controllerNodes().values()) {
+                    socketFactoryManager.getOrCreatePortForListener(node.id(), 
"CONTROLLER");
+                }
                 for (TestKitNode node : nodes.controllerNodes().values()) {
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
                     SharedServer sharedServer = new SharedServer(
@@ -244,7 +256,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         new Metrics(),
                         connectFutureManager.future,
                         Collections.emptyList(),
-                        faultHandlerFactory
+                        faultHandlerFactory,
+                        
socketFactoryManager.getOrCreateSocketFactory(node.id())
                     );
                     ControllerServer controller = null;
                     try {
@@ -268,18 +281,20 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
                     jointServers.put(node.id(), sharedServer);
                 }
                 for (TestKitNode node : nodes.brokerNodes().values()) {
-                    SharedServer sharedServer = jointServers.computeIfAbsent(
-                        node.id(),
-                        id -> new SharedServer(
+                    SharedServer sharedServer = jointServers.get(node.id());
+                    if (sharedServer == null) {
+                        sharedServer = new SharedServer(
                             createNodeConfig(node),
                             node.initialMetaPropertiesEnsemble(),
                             Time.SYSTEM,
                             new Metrics(),
                             connectFutureManager.future,
                             Collections.emptyList(),
-                            faultHandlerFactory
-                        )
-                    );
+                            faultHandlerFactory,
+                            
socketFactoryManager.getOrCreateSocketFactory(node.id())
+                        );
+                        jointServers.put(node.id(), sharedServer);
+                    }
                     BrokerServer broker = null;
                     try {
                         broker = new BrokerServer(sharedServer);
@@ -301,6 +316,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 if (baseDirectory != null) {
                     Utils.delete(baseDirectory);
                 }
+                socketFactoryManager.close();
                 throw e;
             }
             return new KafkaClusterTestKit(
@@ -309,7 +325,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     brokers,
                     connectFutureManager,
                     baseDirectory,
-                    faultHandlerFactory);
+                    faultHandlerFactory,
+                    socketFactoryManager);
         }
 
         private String listeners(int node) {
@@ -352,6 +369,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager;
     private final File baseDirectory;
     private final SimpleFaultHandlerFactory faultHandlerFactory;
+    private final PreboundSocketFactoryManager socketFactoryManager;
 
     private KafkaClusterTestKit(
         TestKitNodes nodes,
@@ -359,7 +377,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         Map<Integer, BrokerServer> brokers,
         ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager,
         File baseDirectory,
-        SimpleFaultHandlerFactory faultHandlerFactory
+        SimpleFaultHandlerFactory faultHandlerFactory,
+        PreboundSocketFactoryManager socketFactoryManager
     ) {
         /*
           Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
@@ -374,6 +393,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.controllerQuorumVotersFutureManager = 
controllerQuorumVotersFutureManager;
         this.baseDirectory = baseDirectory;
         this.faultHandlerFactory = faultHandlerFactory;
+        this.socketFactoryManager = socketFactoryManager;
     }
 
     public void format() throws Exception {
@@ -407,22 +427,44 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
         boolean writeMetadataDirectory
     ) {
         try {
-            MetaPropertiesEnsemble.Copier copier =
-                new 
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
-            for (Entry<String, MetaProperties> entry : 
ensemble.logDirProps().entrySet()) {
-                String logDir = entry.getKey();
-                if (writeMetadataDirectory || 
(!ensemble.metadataLogDir().equals(Optional.of(logDir)))) {
-                    log.trace("Adding {} to the list of directories to 
format.", logDir);
-                    copier.setLogDirProps(logDir, entry.getValue());
+            Formatter formatter = new Formatter();
+            formatter.setNodeId(ensemble.nodeId().getAsInt());
+            formatter.setClusterId(ensemble.clusterId().get());
+            if (writeMetadataDirectory) {
+                formatter.setDirectories(ensemble.logDirProps().keySet());
+            } else {
+                
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
+                    filter(d -> !ensemble.metadataLogDir().get().equals(d)).
+                    collect(Collectors.toSet()));
+            }
+            if (formatter.directories().isEmpty()) {
+                return;
+            }
+            
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+            formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
+                
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
+            formatter.setUnstableFeatureVersionsEnabled(true);
+            formatter.setIgnoreFormatted(false);
+            formatter.setControllerListenerName("CONTROLLER");
+            if (writeMetadataDirectory) {
+                
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+            } else {
+                formatter.setMetadataLogDirectory(Optional.empty());
+            }
+            if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
+                StringBuilder dynamicVotersBuilder = new StringBuilder();
+                String prefix = "";
+                for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
+                    int port = socketFactoryManager.
+                        getOrCreatePortForListener(controllerNode.id(), 
"CONTROLLER");
+                    dynamicVotersBuilder.append(prefix);
+                    prefix = ",";
+                    
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+                        controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
                 }
+                
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
             }
-            copier.setPreWriteHandler((logDir, isNew, metaProperties) -> {
-                log.info("Formatting {}.", logDir);
-                Files.createDirectories(Paths.get(logDir));
-                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(logDir, Optional.empty());
-                bootstrapDirectory.writeBinaryFile(nodes.bootstrapMetadata());
-            });
-            copier.writeLogDirChanges();
+            formatter.run();
         } catch (Exception e) {
             throw new RuntimeException("Failed to format node " + 
ensemble.nodeId(), e);
         }
@@ -640,6 +682,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             throw e;
         } finally {
             ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5, 
TimeUnit.MINUTES);
+            socketFactoryManager.close();
         }
         waitForAllThreads();
         faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
new file mode 100644
index 00000000000..5358b211c77
--- /dev/null
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.ServerSocketFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class PreboundSocketFactoryManager implements AutoCloseable {
+
+    private class PreboundSocketFactory implements ServerSocketFactory {
+        private final int nodeId;
+
+        private PreboundSocketFactory(int nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        @Override
+        public ServerSocketChannel openServerSocket(
+                String listenerName,
+                InetSocketAddress socketAddress,
+                int listenBacklogSize,
+                int recvBufferSize
+        ) throws IOException {
+            ServerSocketChannel socketChannel = 
getSocketForListenerAndMarkAsUsed(
+                nodeId,
+                listenerName);
+            if (socketChannel != null) {
+                return socketChannel;
+            }
+            return ServerSocketFactory.INSTANCE.openServerSocket(
+                listenerName,
+                socketAddress,
+                listenBacklogSize,
+                recvBufferSize);
+        }
+    }
+
+    /**
+     * True if this manager is closed.
+     */
+    private boolean closed = false;
+
+    /**
+     * Maps node IDs to socket factory objects.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, PreboundSocketFactory> factories = new 
HashMap<>();
+
+    /**
+     * Maps node IDs to maps of listener names to ports.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, Map<String, ServerSocketChannel>> sockets = new 
HashMap<>();
+
+    /**
+     * Maps node IDs to set of the listeners that were used.
+     * Protected by the object lock.
+     */
+    private final Map<Integer, Set<String>> usedSockets = new HashMap<>();
+
+    /**
+     * Get a socket from this manager, mark it as used, and return it.
+     *
+     * @param nodeId        The ID of the node.
+     * @param listener      The listener for the socket.
+     *
+     * @return              null if the socket was not found; the socket, 
otherwise.
+     */
+    public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
+        int nodeId,
+        String listener
+    ) {
+        Map<String, ServerSocketChannel> socketsForNode = sockets.get(nodeId);
+        if (socketsForNode == null) {
+            return null;
+        }
+        ServerSocketChannel socket = socketsForNode.get(listener);
+        if (socket == null) {
+            return null;
+        }
+        usedSockets.computeIfAbsent(nodeId, __ -> new 
HashSet<>()).add(listener);
+        return socket;
+    }
+
+    /**
+     * Get or create a socket factory object associated with a given node ID.
+     *
+     * @param nodeId        The ID of the node.
+     *
+     * @return              The socket factory.
+     */
+    public synchronized ServerSocketFactory getOrCreateSocketFactory(int 
nodeId) {
+        return factories.computeIfAbsent(nodeId, __ -> new 
PreboundSocketFactory(nodeId));
+    }
+
+    /**
+     * Get a specific port number. The port will be created if it does not 
already exist.
+     *
+     * @param nodeId        The ID of the node.
+     * @param listener      The listener for the socket.
+     *
+     * @return              The port number.
+     */
+    public synchronized int getOrCreatePortForListener(
+        int nodeId,
+        String listener
+    ) throws IOException {
+        Map<String, ServerSocketChannel> socketsForNode =
+            sockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
+        ServerSocketChannel socketChannel = socketsForNode.get(listener);
+        if (socketChannel == null) {
+            if (closed) {
+                throw new RuntimeException("Cannot open new socket: manager is 
closed.");
+            }
+            socketChannel = ServerSocketFactory.INSTANCE.openServerSocket(
+                listener,
+                new InetSocketAddress(0),
+                -1,
+                -1);
+            socketsForNode.put(listener, socketChannel);
+        }
+        InetSocketAddress socketAddress = (InetSocketAddress) 
socketChannel.getLocalAddress();
+        return socketAddress.getPort();
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        // Close all sockets that haven't been used by a SocketServer. (We 
don't want to close the
+        // ones that have been used by a SocketServer because that is the 
responsibility of that
+        // SocketServer.)
+        for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry : 
sockets.entrySet()) {
+            Set<String> usedListeners = usedSockets.getOrDefault(
+                socketsEntry.getKey(), Collections.emptySet());
+            for (Entry<String, ServerSocketChannel> entry : 
socketsEntry.getValue().entrySet()) {
+                if (!usedListeners.contains(entry.getKey())) {
+                    Utils.closeQuietly(entry.getValue(), 
"serverSocketChannel");
+                }
+            }
+        }
+    }
+}
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java 
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
index 149477eaa78..a7ce81662f7 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.common.test;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 
 import java.util.Map;
@@ -35,6 +36,10 @@ public interface TestKitNode {
         return initialMetaPropertiesEnsemble().logDirProps().keySet();
     }
 
+    default Uuid metadataDirectoryId() {
+        return 
initialMetaPropertiesEnsemble().logDirProps().get(metadataDirectory()).directoryId().get();
+    }
+
     MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
 
     Map<String, String> propertyOverrides();
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java 
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 099ec05c14b..42a621c9bbb 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -57,8 +57,15 @@ public class TestKitNodes {
         private int numBrokerNodes;
         private int numDisksPerBroker = 1;
         private Map<Integer, Map<String, String>> perServerProperties = 
Collections.emptyMap();
-        private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
-            fromVersion(MetadataVersion.latestTesting(), "testkit");
+        private BootstrapMetadata bootstrapMetadata;
+
+        public Builder() {
+            
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+        }
+
+        public Builder(BootstrapMetadata bootstrapMetadata) {
+            this.bootstrapMetadata = bootstrapMetadata;
+        }
         // The brokerListenerName and brokerSecurityProtocol configurations 
must
         // be kept in sync with the default values in ClusterTest.
         private ListenerName brokerListenerName = 
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
@@ -70,7 +77,8 @@ public class TestKitNodes {
         }
 
         public Builder setBootstrapMetadataVersion(MetadataVersion 
metadataVersion) {
-            this.bootstrapMetadata = 
BootstrapMetadata.fromVersion(metadataVersion, "testkit");
+            this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
+                    MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
             return this;
         }
 
@@ -79,6 +87,11 @@ public class TestKitNodes {
             return this;
         }
 
+        public Builder setFeature(String featureName, short level) {
+            this.bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
+            return this;
+        }
+
         public Builder setCombined(boolean combined) {
             this.combined = combined;
             return this;
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index ef287cdf360..6abe2d2e8cf 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.test.api.ClusterConfig;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.Features;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -83,6 +84,9 @@ class ConsumerGroupCommandTestUtils {
                 .setTypes(Collections.singleton(CO_KRAFT))
                 .setServerProperties(serverProperties)
                 .setTags(Collections.singletonList("kraftGroupCoordinator"))
+                .setFeatures(Utils.mkMap(
+                    Utils.mkEntry(Features.TRANSACTION_VERSION, (short) 2),
+                    Utils.mkEntry(Features.GROUP_VERSION, (short) 1)))
                 .build());
     }
 

Reply via email to