This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ebb3202e01d KAFKA-16964 Integration tests for adding and removing
voters (#17582)
ebb3202e01d is described below
commit ebb3202e01d9481d7e711cd25e4e564863b91e1e
Author: kevin-wu24 <[email protected]>
AuthorDate: Mon Nov 4 13:09:37 2024 -0600
KAFKA-16964 Integration tests for adding and removing voters (#17582)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/network/SocketServer.scala | 45 +++---
.../src/main/scala/kafka/server/BrokerServer.scala | 8 +-
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
.../main/scala/kafka/server/KafkaRaftServer.scala | 3 +-
.../src/main/scala/kafka/server/SharedServer.scala | 5 +-
.../kafka/server/metadata/KRaftMetadataCache.scala | 6 +-
.../ReconfigurableQuorumIntegrationTest.java | 161 +++++++++++++++++++
.../kafka/server/KRaftClusterTest.scala | 3 +-
.../kafka/server/QuorumTestHarness.scala | 8 +-
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../unit/kafka/server/ApiVersionsRequestTest.scala | 4 +-
.../metadata/bootstrap/BootstrapMetadata.java | 42 +++--
.../apache/kafka/metadata/storage/Formatter.java | 24 ++-
.../metadata/bootstrap/BootstrapMetadataTest.java | 54 ++++++-
.../apache/kafka/server/ServerSocketFactory.java | 62 ++++++++
.../kafka/common/test/KafkaClusterTestKit.java | 109 +++++++++----
.../common/test/PreboundSocketFactoryManager.java | 170 +++++++++++++++++++++
.../org/apache/kafka/common/test/TestKitNode.java | 5 +
.../org/apache/kafka/common/test/TestKitNodes.java | 19 ++-
.../group/ConsumerGroupCommandTestUtils.java | 4 +
20 files changed, 642 insertions(+), 95 deletions(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 7dc1cecdfd9..f706a8dff95 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -46,6 +46,7 @@ import org.apache.kafka.common.utils.{KafkaThread,
LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName,
Reconfigurable}
import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.ServerSocketFactory
import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.ConnectionDisconnectListener
@@ -76,13 +77,15 @@ import scala.util.control.ControlThrowable
* Acceptor has 1 Processor thread that has its own selector and read
requests from the socket.
* 1 Handler thread that handles requests and produces responses back to
the processor thread for writing.
*/
-class SocketServer(val config: KafkaConfig,
- val metrics: Metrics,
- val time: Time,
- val credentialProvider: CredentialProvider,
- val apiVersionManager: ApiVersionManager,
- val connectionDisconnectListeners:
Seq[ConnectionDisconnectListener] = Seq.empty)
- extends Logging with BrokerReconfigurable {
+class SocketServer(
+ val config: KafkaConfig,
+ val metrics: Metrics,
+ val time: Time,
+ val credentialProvider: CredentialProvider,
+ val apiVersionManager: ApiVersionManager,
+ val socketFactory: ServerSocketFactory = ServerSocketFactory.INSTANCE,
+ val connectionDisconnectListeners: Seq[ConnectionDisconnectListener] =
Seq.empty
+) extends Logging with BrokerReconfigurable {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@@ -721,23 +724,17 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
* Create a server socket to listen for connections on.
*/
private def openServerSocket(host: String, port: Int, listenBacklogSize:
Int): ServerSocketChannel = {
- val socketAddress =
- if (Utils.isBlank(host))
- new InetSocketAddress(port)
- else
- new InetSocketAddress(host, port)
- val serverChannel = ServerSocketChannel.open()
- try {
- serverChannel.configureBlocking(false)
- if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
- serverChannel.socket().setReceiveBufferSize(recvBufferSize)
- serverChannel.socket.bind(socketAddress, listenBacklogSize)
- info(s"Awaiting socket connections on
${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
- } catch {
- case e: SocketException =>
- Utils.closeQuietly(serverChannel, "server socket")
- throw new KafkaException(s"Socket server failed to bind to
${socketAddress.getHostString}:$port: ${e.getMessage}.", e)
- }
+ val socketAddress = if (Utils.isBlank(host)) {
+ new InetSocketAddress(port)
+ } else {
+ new InetSocketAddress(host, port)
+ }
+ val serverChannel = socketServer.socketFactory.openServerSocket(
+ endPoint.listenerName.value(),
+ socketAddress,
+ listenBacklogSize,
+ recvBufferSize)
+ info(s"Awaiting socket connections on
${socketAddress.getHostString}:${serverChannel.socket.getLocalPort}.")
serverChannel
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a40d3bbc243..4a215cc5078 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -265,7 +265,13 @@ class BrokerServer(
// Create and start the socket server acceptor threads so that the bound
port is known.
// Delay starting processors until the end of the initialization
sequence to ensure
// that credentials have been loaded before processing authentications.
- socketServer = new SocketServer(config, metrics, time,
credentialProvider, apiVersionManager, connectionDisconnectListeners)
+ socketServer = new SocketServer(config,
+ metrics,
+ time,
+ credentialProvider,
+ apiVersionManager,
+ sharedServer.socketFactory,
+ connectionDisconnectListeners)
clientQuotaMetadataManager = new
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 8905b2c4f7f..90deff7ed86 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -161,7 +161,8 @@ class ControllerServer(
metrics,
time,
credentialProvider,
- apiVersionManager)
+ apiVersionManager,
+ sharedServer.socketFactory)
val listenerInfo = ListenerInfo
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 7f56600f8b0..4a676a2765e 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -30,7 +30,7 @@ import
org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.storage.internals.log.LogConfig
@@ -73,6 +73,7 @@ class KafkaRaftServer(
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig.voters)),
QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers),
new StandardFaultHandlerFactory(),
+ ServerSocketFactory.INSTANCE,
)
private val broker: Option[BrokerServer] = if
(config.processRoles.contains(ProcessRole.BrokerRole)) {
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala
b/core/src/main/scala/kafka/server/SharedServer.scala
index 658097d3707..a465c385077 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.metadata.ListenerInfo
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler,
ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics,
KafkaYammerMetrics}
@@ -98,7 +98,8 @@ class SharedServer(
private val _metrics: Metrics,
val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer,
InetSocketAddress]],
val bootstrapServers: JCollection[InetSocketAddress],
- val faultHandlerFactory: FaultHandlerFactory
+ val faultHandlerFactory: FaultHandlerFactory,
+ val socketFactory: ServerSocketFactory
) extends Logging {
private val logContext: LogContext = new LogContext(s"[SharedServer
id=${sharedServerConfig.nodeId}] ")
this.logIdent = logContext.logPrefix
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index e6a874c45d1..5fad48f8a71 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -550,8 +550,10 @@ class KRaftMetadataCache(
override def features(): FinalizedFeatures = {
val image = _currentImage
val finalizedFeatures = new java.util.HashMap[String,
java.lang.Short](image.features().finalizedVersions())
- finalizedFeatures.put(KRaftVersion.FEATURE_NAME,
kraftVersionSupplier.get().featureLevel())
-
+ val kraftVersionLevel = kraftVersionSupplier.get().featureLevel()
+ if (kraftVersionLevel > 0) {
+ finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
+ }
new FinalizedFeatures(image.features().metadataVersion(),
finalizedFeatures,
image.highestOffsetAndEpoch().offset,
diff --git
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
new file mode 100644
index 00000000000..6d130015058
--- /dev/null
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.FeatureMetadata;
+import org.apache.kafka.clients.admin.QuorumInfo;
+import org.apache.kafka.clients.admin.RaftVoterEndpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.test.KafkaClusterTestKit;
+import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ReconfigurableQuorumIntegrationTest {
+ static void checkKRaftVersions(Admin admin, short finalized) throws
Exception {
+ FeatureMetadata featureMetadata =
admin.describeFeatures().featureMetadata().get();
+ if (finalized > 0) {
+
assertTrue(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+ assertEquals(finalized, featureMetadata.finalizedFeatures().
+ get(KRaftVersion.FEATURE_NAME).minVersionLevel());
+ assertEquals(finalized, featureMetadata.finalizedFeatures().
+ get(KRaftVersion.FEATURE_NAME).maxVersionLevel());
+ } else {
+
assertFalse(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME));
+ }
+ assertEquals((short) 0, featureMetadata.supportedFeatures().
+ get(KRaftVersion.FEATURE_NAME).minVersion());
+ assertEquals((short) 1, featureMetadata.supportedFeatures().
+ get(KRaftVersion.FEATURE_NAME).maxVersion());
+ }
+
+ @Test
+ public void testCreateAndDestroyNonReconfigurableCluster() throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ checkKRaftVersions(admin, (short) 0);
+ });
+ }
+ }
+ }
+
+ @Test
+ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(1).
+ setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+ checkKRaftVersions(admin, (short) 1);
+ });
+ }
+ }
+ }
+
+ static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
+ QuorumInfo quorumInfo =
admin.describeMetadataQuorum().quorumInfo().get();
+ Map<Integer, Uuid> result = new TreeMap<>();
+ quorumInfo.voters().forEach(v -> {
+ result.put(v.replicaId(), v.replicaDirectoryId());
+ });
+ return result;
+ }
+
+ @Test
+ public void testRemoveController() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(new HashSet<>(Arrays.asList(3000, 3001,
3002)), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+ assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+ }
+ });
+ admin.removeRaftVoter(3000, cluster.nodes().
+
controllerNodes().get(3000).metadataDirectoryId()).all().get();
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveAndAddSameController() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(4).
+ setFeature(KRaftVersion.FEATURE_NAME, (short) 1).
+ build()).build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(new HashSet<>(Arrays.asList(3000, 3001, 3002,
3003)), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002, 3003}) {
+ assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+ }
+ });
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ admin.removeRaftVoter(3000, dirId).all().get();
+ admin.addRaftVoter(
+ 3000,
+ dirId,
+ Collections.singleton(new RaftVoterEndpoint("CONTROLLER",
"example.com", 8080))
+ ).all().get();
+ }
+ }
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index cbc502801cc..21ae6c379bd 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1389,8 +1389,7 @@ class KRaftClusterTest {
setName("num.io.threads").
setValue("9"), 0.toShort))
val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setBootstrapMetadata(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords")).
+ new TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords,
"testRecords")).
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).
build()
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 154589a7853..985b6be0452 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
@@ -137,7 +137,8 @@ class KRaftQuorumImplementation(
new Metrics(),
controllerQuorumVotersFuture,
controllerQuorumVotersFuture.get().values(),
- faultHandlerFactory
+ faultHandlerFactory,
+ ServerSocketFactory.INSTANCE,
)
var broker: BrokerServer = null
try {
@@ -387,7 +388,8 @@ abstract class QuorumTestHarness extends Logging {
new Metrics(),
controllerQuorumVotersFuture,
Collections.emptyList(),
- faultHandlerFactory
+ faultHandlerFactory,
+ ServerSocketFactory.INSTANCE,
)
var controllerServer: ControllerServer = null
try {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index deb77a79de9..ca4156eacd2 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -2248,7 +2248,7 @@ class SocketServerTest {
time: Time = Time.SYSTEM,
connectionDisconnectListeners: Seq[ConnectionDisconnectListener] =
Seq.empty
) extends SocketServer(
- config, new Metrics, time, credentialProvider, apiVersionManager,
connectionDisconnectListeners
+ config, new Metrics, time, credentialProvider, apiVersionManager,
connectionDisconnectListeners = connectionDisconnectListeners
) {
override def createDataPlaneAcceptor(endPoint: EndPoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel) :
DataPlaneAcceptor = {
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 6da1d6c2f11..eea2d7cc46b 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -71,8 +71,8 @@ class ApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVersio
// Use the latest production MV for this test
@ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion =
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
- new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
- new ClusterConfigProperty(key = "unstable.feature.versions.enable",
value = "false"),
+ new ClusterConfigProperty(key = "unstable.api.versions.enable", value =
"false"),
+ new ClusterConfigProperty(key = "unstable.feature.versions.enable", value
= "false"),
))
def testApiVersionsRequestValidationV0(): Unit = {
val apiVersionsRequest = new
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 1de6a1f0ee5..2dc6d9a6eaf 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -134,19 +134,43 @@ public class BootstrapMetadata {
return source;
}
- public BootstrapMetadata copyWithOnlyVersion() {
- ApiMessageAndVersion versionRecord = null;
+ public short featureLevel(String featureName) {
+ short result = 0;
for (ApiMessageAndVersion record : records) {
- if (recordToMetadataVersion(record.message()).isPresent()) {
- versionRecord = record;
+ if (record.message() instanceof FeatureLevelRecord) {
+ FeatureLevelRecord message = (FeatureLevelRecord)
record.message();
+ if (message.name().equals(featureName)) {
+ result = message.featureLevel();
+ }
}
}
- if (versionRecord == null) {
- throw new RuntimeException("No FeatureLevelRecord for " +
MetadataVersion.FEATURE_NAME +
- " was found in " + source);
+ return result;
+ }
+
+ public BootstrapMetadata copyWithFeatureRecord(String featureName, short
level) {
+ List<ApiMessageAndVersion> newRecords = new ArrayList<>();
+ int i = 0;
+ while (i < records.size()) {
+ if (records.get(i).message() instanceof FeatureLevelRecord) {
+ FeatureLevelRecord record = (FeatureLevelRecord)
records.get(i).message();
+ if (record.name().equals(featureName)) {
+ FeatureLevelRecord newRecord = record.duplicate();
+ newRecord.setFeatureLevel(level);
+ newRecords.add(new ApiMessageAndVersion(newRecord, (short)
0));
+ break;
+ } else {
+ newRecords.add(records.get(i));
+ }
+ }
+ i++;
+ }
+ if (i == records.size()) {
+ FeatureLevelRecord newRecord = new FeatureLevelRecord().
+ setName(featureName).
+ setFeatureLevel(level);
+ newRecords.add(new ApiMessageAndVersion(newRecord, (short) 0));
}
- return new BootstrapMetadata(Collections.singletonList(versionRecord),
- metadataVersion, source);
+ return BootstrapMetadata.fromRecords(newRecords, source);
}
@Override
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 6963707d850..53013307149 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -124,7 +124,7 @@ public class Formatter {
/**
* The metadata log directory.
*/
- private String metadataLogDirectory = null;
+ private Optional<String> metadataLogDirectory = Optional.empty();
/**
* The initial KIP-853 voters.
@@ -166,6 +166,10 @@ public class Formatter {
return this;
}
+ public Collection<String> directories() {
+ return directories;
+ }
+
public Formatter setReleaseVersion(MetadataVersion releaseVersion) {
this.releaseVersion = releaseVersion;
return this;
@@ -197,6 +201,11 @@ public class Formatter {
}
public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
+ this.metadataLogDirectory = Optional.of(metadataLogDirectory);
+ return this;
+ }
+
+ public Formatter setMetadataLogDirectory(Optional<String>
metadataLogDirectory) {
this.metadataLogDirectory = metadataLogDirectory;
return this;
}
@@ -231,13 +240,12 @@ public class Formatter {
if (controllerListenerName == null) {
throw new FormatterException("You must specify the name of the
initial controller listener.");
}
- if (metadataLogDirectory == null) {
- throw new FormatterException("You must specify the metadata log
directory.");
- }
- if (!directories.contains(metadataLogDirectory)) {
- throw new FormatterException("The specified metadata log
directory, " + metadataLogDirectory +
+ metadataLogDirectory.ifPresent(d -> {
+ if (!directories.contains(d)) {
+ throw new FormatterException("The specified metadata log
directory, " + d +
" was not one of the given directories: " + directories);
- }
+ }
+ });
releaseVersion = calculateEffectiveReleaseVersion();
featureLevels = calculateEffectiveFeatureLevels();
this.bootstrapMetadata = calculateBootstrapMetadata();
@@ -400,7 +408,7 @@ public class Formatter {
Map<String, DirectoryType> directoryTypes = new HashMap<>();
for (String emptyLogDir : ensemble.emptyLogDirs()) {
DirectoryType directoryType =
DirectoryType.calculate(emptyLogDir,
- metadataLogDirectory,
+ metadataLogDirectory.orElse(""),
nodeId,
initialControllers);
directoryTypes.put(emptyLogDir, directoryType);
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 51189412f41..fd41fefabf0 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -71,10 +72,57 @@ public class BootstrapMetadataTest {
() -> BootstrapMetadata.fromRecords(emptyList(),
"quux")).getMessage());
}
+ private static final ApiMessageAndVersion MV_10 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(FEATURE_NAME).
+ setFeatureLevel((short) 10), (short) 0);
+
+ private static final ApiMessageAndVersion MV_11 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(FEATURE_NAME).
+ setFeatureLevel((short) 11), (short) 0);
+
+ private static final ApiMessageAndVersion FOO_1 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("foo").
+ setFeatureLevel((short) 1), (short) 0);
+
+ private static final ApiMessageAndVersion FOO_2 =
+ new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("foo").
+ setFeatureLevel((short) 2), (short) 0);
+
+ @Test
+ public void testCopyWithNewFeatureRecord() {
+ assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10,
FOO_1), "src"),
+ BootstrapMetadata.fromRecords(Arrays.asList(MV_10), "src").
+ copyWithFeatureRecord("foo", (short) 1));
+ }
+
+ @Test
+ public void testFeatureLevelForMetadataVersion() {
+ assertEquals((short) 11, BootstrapMetadata.
+ fromRecords(Arrays.asList(MV_10, MV_11), "src").
+ featureLevel(FEATURE_NAME));
+ }
+
+ @Test
+ public void testCopyWithModifiedFeatureRecord() {
+ assertEquals(BootstrapMetadata.fromRecords(Arrays.asList(MV_10,
FOO_2), "src"),
+ BootstrapMetadata.fromRecords(Arrays.asList(MV_10, FOO_1), "src").
+ copyWithFeatureRecord("foo", (short) 2));
+ }
+
+ @Test
+ public void testFeatureLevelForFeatureThatIsNotSet() {
+ assertEquals((short) 0, BootstrapMetadata.
+ fromRecords(Arrays.asList(MV_10), "src").featureLevel("foo"));
+ }
+
@Test
- public void testCopyWithOnlyVersion() {
- assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3),
IBP_3_3_IV2, "baz"),
- BootstrapMetadata.fromRecords(SAMPLE_RECORDS1,
"baz").copyWithOnlyVersion());
+ public void testFeatureLevelForFeature() {
+ assertEquals((short) 2, BootstrapMetadata.
+ fromRecords(Arrays.asList(MV_10, FOO_2),
"src").featureLevel("foo"));
}
static final List<ApiMessageAndVersion> RECORDS_WITH_OLD_METADATA_VERSION
= Collections.singletonList(
diff --git
a/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java
b/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java
new file mode 100644
index 00000000000..7739507ade8
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ServerSocketFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerSocketFactory {
+ ServerSocketFactory INSTANCE = new KafkaServerSocketFactory();
+ ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException;
+
+ class KafkaServerSocketFactory implements ServerSocketFactory {
+
+ @Override
+ public ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException {
+ ServerSocketChannel socketChannel = ServerSocketChannel.open();
+ try {
+ socketChannel.configureBlocking(false);
+ if (recvBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) {
+
socketChannel.socket().setReceiveBufferSize(recvBufferSize);
+ }
+ socketChannel.socket().bind(socketAddress, listenBacklogSize);
+ } catch (SocketException e) {
+ Utils.closeQuietly(socketChannel, "server socket");
+ throw new KafkaException(String.format("Socket server failed
to bind to %s:%d: %s.",
+ socketAddress.getHostString(), socketAddress.getPort(),
e.getMessage()), e);
+ }
+ return socketChannel;
+ }
+ }
+}
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 4ef6ce41550..db857e5bcc8 100644
---
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -34,12 +34,13 @@ import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.Controller;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
-import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
+import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
+import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.fault.FaultHandler;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -147,11 +149,13 @@ public class KafkaClusterTestKit implements AutoCloseable
{
private final TestKitNodes nodes;
private final Map<String, Object> configProps = new HashMap<>();
private final SimpleFaultHandlerFactory faultHandlerFactory = new
SimpleFaultHandlerFactory();
+ private final PreboundSocketFactoryManager socketFactoryManager = new
PreboundSocketFactoryManager();
private final String brokerListenerName;
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
+
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
this.brokerListenerName = nodes.brokerListenerName().value();
@@ -165,7 +169,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
return this;
}
- private KafkaConfig createNodeConfig(TestKitNode node) {
+ private KafkaConfig createNodeConfig(TestKitNode node) throws
IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode =
nodes.controllerNodes().get(node.id());
@@ -201,13 +205,18 @@ public class KafkaClusterTestKit implements AutoCloseable
{
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
brokerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
controllerListenerName);
- // Note: we can't accurately set controller.quorum.voters yet,
since we don't
- // yet know what ports each controller will pick. Set it to a
dummy string
- // for now as a placeholder.
- String uninitializedQuorumVotersString =
nodes.controllerNodes().keySet().stream().
- map(n -> String.format("%[email protected]:0", n)).
- collect(Collectors.joining(","));
- props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
uninitializedQuorumVotersString);
+ StringBuilder quorumVoterStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ quorumVoterStringBuilder.append(prefix).
+ append(nodeId).
+ append("@").
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
// reduce log cleaner offset map memory usage
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
"2097152");
@@ -235,6 +244,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
try {
baseDirectory = new File(nodes.baseDirectory());
+ for (TestKitNode node : nodes.controllerNodes().values()) {
+ socketFactoryManager.getOrCreatePortForListener(node.id(),
"CONTROLLER");
+ }
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory,
node.metadataDirectory(), Collections.emptyList());
SharedServer sharedServer = new SharedServer(
@@ -244,7 +256,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
new Metrics(),
connectFutureManager.future,
Collections.emptyList(),
- faultHandlerFactory
+ faultHandlerFactory,
+
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
ControllerServer controller = null;
try {
@@ -268,18 +281,20 @@ public class KafkaClusterTestKit implements AutoCloseable
{
jointServers.put(node.id(), sharedServer);
}
for (TestKitNode node : nodes.brokerNodes().values()) {
- SharedServer sharedServer = jointServers.computeIfAbsent(
- node.id(),
- id -> new SharedServer(
+ SharedServer sharedServer = jointServers.get(node.id());
+ if (sharedServer == null) {
+ sharedServer = new SharedServer(
createNodeConfig(node),
node.initialMetaPropertiesEnsemble(),
Time.SYSTEM,
new Metrics(),
connectFutureManager.future,
Collections.emptyList(),
- faultHandlerFactory
- )
- );
+ faultHandlerFactory,
+
socketFactoryManager.getOrCreateSocketFactory(node.id())
+ );
+ jointServers.put(node.id(), sharedServer);
+ }
BrokerServer broker = null;
try {
broker = new BrokerServer(sharedServer);
@@ -301,6 +316,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (baseDirectory != null) {
Utils.delete(baseDirectory);
}
+ socketFactoryManager.close();
throw e;
}
return new KafkaClusterTestKit(
@@ -309,7 +325,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
brokers,
connectFutureManager,
baseDirectory,
- faultHandlerFactory);
+ faultHandlerFactory,
+ socketFactoryManager);
}
private String listeners(int node) {
@@ -352,6 +369,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final ControllerQuorumVotersFutureManager
controllerQuorumVotersFutureManager;
private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
+ private final PreboundSocketFactoryManager socketFactoryManager;
private KafkaClusterTestKit(
TestKitNodes nodes,
@@ -359,7 +377,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
Map<Integer, BrokerServer> brokers,
ControllerQuorumVotersFutureManager
controllerQuorumVotersFutureManager,
File baseDirectory,
- SimpleFaultHandlerFactory faultHandlerFactory
+ SimpleFaultHandlerFactory faultHandlerFactory,
+ PreboundSocketFactoryManager socketFactoryManager
) {
/*
Number of threads = Total number of brokers + Total number of
controllers + Total number of Raft Managers
@@ -374,6 +393,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.controllerQuorumVotersFutureManager =
controllerQuorumVotersFutureManager;
this.baseDirectory = baseDirectory;
this.faultHandlerFactory = faultHandlerFactory;
+ this.socketFactoryManager = socketFactoryManager;
}
public void format() throws Exception {
@@ -407,22 +427,44 @@ public class KafkaClusterTestKit implements AutoCloseable
{
boolean writeMetadataDirectory
) {
try {
- MetaPropertiesEnsemble.Copier copier =
- new
MetaPropertiesEnsemble.Copier(MetaPropertiesEnsemble.EMPTY);
- for (Entry<String, MetaProperties> entry :
ensemble.logDirProps().entrySet()) {
- String logDir = entry.getKey();
- if (writeMetadataDirectory ||
(!ensemble.metadataLogDir().equals(Optional.of(logDir)))) {
- log.trace("Adding {} to the list of directories to
format.", logDir);
- copier.setLogDirProps(logDir, entry.getValue());
+ Formatter formatter = new Formatter();
+ formatter.setNodeId(ensemble.nodeId().getAsInt());
+ formatter.setClusterId(ensemble.clusterId().get());
+ if (writeMetadataDirectory) {
+ formatter.setDirectories(ensemble.logDirProps().keySet());
+ } else {
+
formatter.setDirectories(ensemble.logDirProps().keySet().stream().
+ filter(d -> !ensemble.metadataLogDir().get().equals(d)).
+ collect(Collectors.toSet()));
+ }
+ if (formatter.directories().isEmpty()) {
+ return;
+ }
+
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
+ formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
+
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
+ formatter.setUnstableFeatureVersionsEnabled(true);
+ formatter.setIgnoreFormatted(false);
+ formatter.setControllerListenerName("CONTROLLER");
+ if (writeMetadataDirectory) {
+
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+ } else {
+ formatter.setMetadataLogDirectory(Optional.empty());
+ }
+ if
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
+ StringBuilder dynamicVotersBuilder = new StringBuilder();
+ String prefix = "";
+ for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
+ int port = socketFactoryManager.
+ getOrCreatePortForListener(controllerNode.id(),
"CONTROLLER");
+ dynamicVotersBuilder.append(prefix);
+ prefix = ",";
+
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
+ controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
}
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
- copier.setPreWriteHandler((logDir, isNew, metaProperties) -> {
- log.info("Formatting {}.", logDir);
- Files.createDirectories(Paths.get(logDir));
- BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(logDir, Optional.empty());
- bootstrapDirectory.writeBinaryFile(nodes.bootstrapMetadata());
- });
- copier.writeLogDirChanges();
+ formatter.run();
} catch (Exception e) {
throw new RuntimeException("Failed to format node " +
ensemble.nodeId(), e);
}
@@ -640,6 +682,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
throw e;
} finally {
ThreadUtils.shutdownExecutorServiceQuietly(executorService, 5,
TimeUnit.MINUTES);
+ socketFactoryManager.close();
}
waitForAllThreads();
faultHandlerFactory.fatalFaultHandler().maybeRethrowFirstException();
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
new file mode 100644
index 00000000000..5358b211c77
--- /dev/null
+++
b/test-common/src/main/java/org/apache/kafka/common/test/PreboundSocketFactoryManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.ServerSocketFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class PreboundSocketFactoryManager implements AutoCloseable {
+
+ private class PreboundSocketFactory implements ServerSocketFactory {
+ private final int nodeId;
+
+ private PreboundSocketFactory(int nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public ServerSocketChannel openServerSocket(
+ String listenerName,
+ InetSocketAddress socketAddress,
+ int listenBacklogSize,
+ int recvBufferSize
+ ) throws IOException {
+ ServerSocketChannel socketChannel =
getSocketForListenerAndMarkAsUsed(
+ nodeId,
+ listenerName);
+ if (socketChannel != null) {
+ return socketChannel;
+ }
+ return ServerSocketFactory.INSTANCE.openServerSocket(
+ listenerName,
+ socketAddress,
+ listenBacklogSize,
+ recvBufferSize);
+ }
+ }
+
+ /**
+ * True if this manager is closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Maps node IDs to socket factory objects.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, PreboundSocketFactory> factories = new
HashMap<>();
+
+ /**
+ * Maps node IDs to maps of listener names to ports.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, Map<String, ServerSocketChannel>> sockets = new
HashMap<>();
+
+ /**
+ * Maps node IDs to set of the listeners that were used.
+ * Protected by the object lock.
+ */
+ private final Map<Integer, Set<String>> usedSockets = new HashMap<>();
+
+ /**
+ * Get a socket from this manager, mark it as used, and return it.
+ *
+ * @param nodeId The ID of the node.
+ * @param listener The listener for the socket.
+ *
+ * @return null if the socket was not found; the socket,
otherwise.
+ */
+ public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
+ int nodeId,
+ String listener
+ ) {
+ Map<String, ServerSocketChannel> socketsForNode = sockets.get(nodeId);
+ if (socketsForNode == null) {
+ return null;
+ }
+ ServerSocketChannel socket = socketsForNode.get(listener);
+ if (socket == null) {
+ return null;
+ }
+ usedSockets.computeIfAbsent(nodeId, __ -> new
HashSet<>()).add(listener);
+ return socket;
+ }
+
+ /**
+ * Get or create a socket factory object associated with a given node ID.
+ *
+ * @param nodeId The ID of the node.
+ *
+ * @return The socket factory.
+ */
+ public synchronized ServerSocketFactory getOrCreateSocketFactory(int
nodeId) {
+ return factories.computeIfAbsent(nodeId, __ -> new
PreboundSocketFactory(nodeId));
+ }
+
+ /**
+ * Get a specific port number. The port will be created if it does not
already exist.
+ *
+ * @param nodeId The ID of the node.
+ * @param listener The listener for the socket.
+ *
+ * @return The port number.
+ */
+ public synchronized int getOrCreatePortForListener(
+ int nodeId,
+ String listener
+ ) throws IOException {
+ Map<String, ServerSocketChannel> socketsForNode =
+ sockets.computeIfAbsent(nodeId, __ -> new HashMap<>());
+ ServerSocketChannel socketChannel = socketsForNode.get(listener);
+ if (socketChannel == null) {
+ if (closed) {
+ throw new RuntimeException("Cannot open new socket: manager is
closed.");
+ }
+ socketChannel = ServerSocketFactory.INSTANCE.openServerSocket(
+ listener,
+ new InetSocketAddress(0),
+ -1,
+ -1);
+ socketsForNode.put(listener, socketChannel);
+ }
+ InetSocketAddress socketAddress = (InetSocketAddress)
socketChannel.getLocalAddress();
+ return socketAddress.getPort();
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ // Close all sockets that haven't been used by a SocketServer. (We
don't want to close the
+ // ones that have been used by a SocketServer because that is the
responsibility of that
+ // SocketServer.)
+ for (Entry<Integer, Map<String, ServerSocketChannel>> socketsEntry :
sockets.entrySet()) {
+ Set<String> usedListeners = usedSockets.getOrDefault(
+ socketsEntry.getKey(), Collections.emptySet());
+ for (Entry<String, ServerSocketChannel> entry :
socketsEntry.getValue().entrySet()) {
+ if (!usedListeners.contains(entry.getKey())) {
+ Utils.closeQuietly(entry.getValue(),
"serverSocketChannel");
+ }
+ }
+ }
+ }
+}
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
index 149477eaa78..a7ce81662f7 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNode.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.test;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import java.util.Map;
@@ -35,6 +36,10 @@ public interface TestKitNode {
return initialMetaPropertiesEnsemble().logDirProps().keySet();
}
+ default Uuid metadataDirectoryId() {
+ return
initialMetaPropertiesEnsemble().logDirProps().get(metadataDirectory()).directoryId().get();
+ }
+
MetaPropertiesEnsemble initialMetaPropertiesEnsemble();
Map<String, String> propertyOverrides();
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 099ec05c14b..42a621c9bbb 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -57,8 +57,15 @@ public class TestKitNodes {
private int numBrokerNodes;
private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perServerProperties =
Collections.emptyMap();
- private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
- fromVersion(MetadataVersion.latestTesting(), "testkit");
+ private BootstrapMetadata bootstrapMetadata;
+
+ public Builder() {
+
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+ }
+
+ public Builder(BootstrapMetadata bootstrapMetadata) {
+ this.bootstrapMetadata = bootstrapMetadata;
+ }
// The brokerListenerName and brokerSecurityProtocol configurations
must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName =
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
@@ -70,7 +77,8 @@ public class TestKitNodes {
}
public Builder setBootstrapMetadataVersion(MetadataVersion
metadataVersion) {
- this.bootstrapMetadata =
BootstrapMetadata.fromVersion(metadataVersion, "testkit");
+ this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(
+ MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel());
return this;
}
@@ -79,6 +87,11 @@ public class TestKitNodes {
return this;
}
+ public Builder setFeature(String featureName, short level) {
+ this.bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
+ return this;
+ }
+
public Builder setCombined(boolean combined) {
this.combined = combined;
return this;
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index ef287cdf360..6abe2d2e8cf 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.Features;
import java.time.Duration;
import java.util.ArrayList;
@@ -83,6 +84,9 @@ class ConsumerGroupCommandTestUtils {
.setTypes(Collections.singleton(CO_KRAFT))
.setServerProperties(serverProperties)
.setTags(Collections.singletonList("kraftGroupCoordinator"))
+ .setFeatures(Utils.mkMap(
+ Utils.mkEntry(Features.TRANSACTION_VERSION, (short) 2),
+ Utils.mkEntry(Features.GROUP_VERSION, (short) 1)))
.build());
}