This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b825af559ac KAFKA-19755 Move KRaftClusterTest from core module to
server module [2/4] (#20861)
b825af559ac is described below
commit b825af559acb1c5e3d787de706eeb46769e6f888
Author: Lan Ding <[email protected]>
AuthorDate: Fri Dec 12 22:39:33 2025 +0800
KAFKA-19755 Move KRaftClusterTest from core module to server module [2/4]
(#20861)
Move KRaftClusterTest from core module to server module.
Rewrite
- testCreateClusterAndCreateListDeleteTopic
- testCreateClusterAndCreateAndManyTopics
- testClientQuotas
- testDefaultClientQuotas
- testCreateClusterWithAdvertisedPortZero
- testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer
- testUnregisterBroker
Reviewers: Hong-Yi Chen <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/server/KRaftClusterTest.scala | 405 +--------------------
.../org/apache/kafka/server/KRaftClusterTest.java | 402 +++++++++++++++++++-
2 files changed, 399 insertions(+), 408 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b0f44b8683b..24a13e3c753 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -17,25 +17,19 @@
package kafka.server
-import kafka.network.SocketServer
import kafka.utils.TestUtils
-import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{InvalidPartitionsException,
PolicyViolationException, UnsupportedVersionException}
-import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
-import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
-import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest,
DescribeClusterResponse}
+import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
import org.apache.kafka.controller.{QuorumController,
QuorumControllerIntegrationTestUtils}
-import org.apache.kafka.image.ClusterImage
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.network.SocketServerConfigs
@@ -52,11 +46,10 @@ import org.slf4j.LoggerFactory
import java.io.File
import java.nio.charset.StandardCharsets
import java.nio.file.{FileSystems, Files, Path, Paths}
-import java.{lang, util}
+import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import java.util.{Optional, OptionalLong, Properties}
import scala.collection.{Seq, mutable}
-import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
import scala.jdk.CollectionConverters._
import scala.util.Using
@@ -93,348 +86,6 @@ class KRaftClusterTest {
}
}
- @Test
- def testCreateClusterAndCreateListDeleteTopic(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(3).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
- "RaftManager was not initialized.")
-
- val admin = Admin.create(cluster.clientProperties())
- try {
- // Create a test topic
- val newTopic = util.List.of(new NewTopic("test-topic", 1, 3.toShort))
- val createTopicResult = admin.createTopics(newTopic)
- createTopicResult.all().get()
- waitForTopicListing(admin, Seq("test-topic"), Seq())
-
- // Delete topic
- val deleteResult = admin.deleteTopics(util.List.of("test-topic"))
- deleteResult.all().get()
-
- // List again
- waitForTopicListing(admin, Seq(), Seq("test-topic"))
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testCreateClusterAndCreateAndManyTopics(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(3).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
- "RaftManager was not initialized.")
- val admin = Admin.create(cluster.clientProperties())
- try {
- // Create many topics
- val newTopic = new util.ArrayList[NewTopic]()
- newTopic.add(new NewTopic("test-topic-1", 2, 3.toShort))
- newTopic.add(new NewTopic("test-topic-2", 2, 3.toShort))
- newTopic.add(new NewTopic("test-topic-3", 2, 3.toShort))
- val createTopicResult = admin.createTopics(newTopic)
- createTopicResult.all().get()
-
- // List created topics
- waitForTopicListing(admin, Seq("test-topic-1", "test-topic-2",
"test-topic-3"), Seq())
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testClientQuotas(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).build()
- try {
- cluster.format()
- cluster.startup()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- val admin = Admin.create(cluster.clientProperties())
- try {
- val entity = new ClientQuotaEntity(util.Map.of("user", "testkit"))
- var filter = ClientQuotaFilter.containsOnly(
- List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
-
- def alterThenDescribe(entity: ClientQuotaEntity,
- quotas: Seq[ClientQuotaAlteration.Op],
- filter: ClientQuotaFilter,
- expectCount: Int): util.Map[ClientQuotaEntity,
util.Map[String, lang.Double]] = {
- val alterResult = admin.alterClientQuotas(util.List.of(new
ClientQuotaAlteration(entity, quotas.asJava)))
- try {
- alterResult.all().get()
- } catch {
- case t: Throwable => fail("AlterClientQuotas request failed", t)
- }
-
- def describeOrFail(filter: ClientQuotaFilter):
util.Map[ClientQuotaEntity, util.Map[String, lang.Double]] = {
- try {
- admin.describeClientQuotas(filter).entities().get()
- } catch {
- case t: Throwable => fail("DescribeClientQuotas request failed",
t)
- }
- }
-
- val (describeResult, ok) =
TestUtils.computeUntilTrue(describeOrFail(filter)) {
- results => results.getOrDefault(entity, util.Map.of[String,
lang.Double]()).size() == expectCount
- }
- assertTrue(ok, "Broker never saw new client quotas")
- describeResult
- }
-
- var describeResult = alterThenDescribe(entity,
- Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)),
filter, 1)
- assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6)
-
- describeResult = alterThenDescribe(entity, Seq(
- new ClientQuotaAlteration.Op("request_percentage", 0.97),
- new ClientQuotaAlteration.Op("producer_byte_rate", 10000),
- new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
- ), filter, 3)
- assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6)
- assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
- assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
-
- describeResult = alterThenDescribe(entity, Seq(
- new ClientQuotaAlteration.Op("request_percentage", 0.95),
- new ClientQuotaAlteration.Op("producer_byte_rate", null),
- new ClientQuotaAlteration.Op("consumer_byte_rate", null)
- ), filter, 1)
- assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6)
-
- describeResult = alterThenDescribe(entity, Seq(
- new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
-
- describeResult = alterThenDescribe(entity,
- Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9999)),
filter, 1)
- assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
-
- // Add another quota for a different entity with same user part
- val entity2 = new ClientQuotaEntity(util.Map.of("user", "testkit",
"client-id", "some-client"))
- filter = ClientQuotaFilter.containsOnly(
- util.List.of(
- ClientQuotaFilterComponent.ofEntity("user", "testkit"),
- ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
- ))
- describeResult = alterThenDescribe(entity2,
- Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)),
filter, 1)
- assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
-
- // non-strict match
- filter = ClientQuotaFilter.contains(
- util.List.of(ClientQuotaFilterComponent.ofEntity("user", "testkit")))
-
- TestUtils.tryUntilNoAssertionError() {
- val results = admin.describeClientQuotas(filter).entities().get()
- assertEquals(2, results.size(), "Broker did not see two client
quotas")
- assertEquals(9999.0, results.get(entity).get("producer_byte_rate"),
1e-6)
- assertEquals(9998.0, results.get(entity2).get("producer_byte_rate"),
1e-6)
- }
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- def setConsumerByteRate(
- admin: Admin,
- entity: ClientQuotaEntity,
- value: Long
- ): Unit = {
- admin.alterClientQuotas(util.List.of(
- new ClientQuotaAlteration(entity, util.List.of(
- new Op("consumer_byte_rate", value.doubleValue()))))).
- all().get()
- }
-
- def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
- val allFilter = ClientQuotaFilter.contains(util.List.of)
- val results = new util.HashMap[ClientQuotaEntity, Long]
- admin.describeClientQuotas(allFilter).entities().get().forEach {
- case (entity, entityMap) =>
- Option(entityMap.get("consumer_byte_rate")).foreach(value =>
results.put(entity, value.longValue()))
- }
- results.asScala.toMap
- }
-
- @Test
- def testDefaultClientQuotas(): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(1).build()).build()
- try {
- cluster.format()
- cluster.startup()
- TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState ==
BrokerState.RUNNING,
- "Broker never made it to RUNNING state.")
- val admin = Admin.create(cluster.clientProperties())
- try {
- val defaultUser = new
ClientQuotaEntity(util.Collections.singletonMap[String, String]("user", null))
- val bobUser = new ClientQuotaEntity(util.Map.of[String,
String]("user", "bob"))
- TestUtils.retry(30000) {
- assertEquals(Map(), getConsumerByteRates(admin))
- }
- setConsumerByteRate(admin, defaultUser, 100L)
- TestUtils.retry(30000) {
- assertEquals(Map(
- defaultUser -> 100L
- ), getConsumerByteRates(admin))
- }
- setConsumerByteRate(admin, bobUser, 1000L)
- TestUtils.retry(30000) {
- assertEquals(Map(
- defaultUser -> 100L,
- bobUser -> 1000L
- ), getConsumerByteRates(admin))
- }
- } finally {
- admin.close()
- }
- } finally {
- cluster.close()
- }
- }
-
- @Test
- def testCreateClusterWithAdvertisedPortZero(): Unit = {
- val brokerPropertyOverrides: util.Map[Integer, util.Map[String, String]] =
new util.HashMap[Integer, util.Map[String, String]]()
- Seq.range(0, 3).asJava.forEach(brokerId => {
- val props = new util.HashMap[String, String]()
- props.put(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0")
- props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0")
- brokerPropertyOverrides.put(brokerId, props)
- })
-
- val nodes = new TestKitNodes.Builder()
- .setNumControllerNodes(1)
- .setNumBrokerNodes(3)
- .setPerServerProperties(brokerPropertyOverrides)
- .build()
-
- doOnStartedKafkaCluster(nodes) { implicit cluster =>
-
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName,
(15L, SECONDS))
- .nodes.values.forEach { broker =>
- assertEquals("localhost", broker.host,
- "Did not advertise configured advertised host")
-
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.brokerListenerName),
broker.port,
- "Did not advertise bound socket port")
- }
- }
- }
-
- @Test
- def testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer():
Unit = {
- val brokerPropertyOverrides: util.Map[Integer, util.Map[String, String]] =
new util.HashMap[Integer, util.Map[String, String]]()
- Seq.range(0, 3).asJava.forEach(brokerId => {
- val props = new util.HashMap[String, String]()
- props.put(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0")
- props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
s"EXTERNAL://advertised-host-$brokerId:${brokerId + 100}")
- brokerPropertyOverrides.put(brokerId, props)
- })
-
- val nodes = new TestKitNodes.Builder()
- .setNumControllerNodes(1)
- .setNumBrokerNodes(3)
- .setNumDisksPerBroker(1)
- .setPerServerProperties(brokerPropertyOverrides)
- .build()
-
- doOnStartedKafkaCluster(nodes) { implicit cluster =>
-
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName,
(15L, SECONDS))
- .nodes.values.forEach { broker =>
- assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not
advertise configured advertised host")
- assertEquals(broker.id + 100, broker.port, "Did not advertise
configured advertised port")
- }
- }
- }
-
- private def doOnStartedKafkaCluster(nodes: TestKitNodes)
- (action: KafkaClusterTestKit => Unit):
Unit = {
- val cluster = new KafkaClusterTestKit.Builder(nodes).build()
- try {
- cluster.format()
- cluster.startup()
- action(cluster)
- } finally {
- cluster.close()
- }
- }
-
- private def
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(listenerName:
ListenerName,
-
waitTime: FiniteDuration)
-
(implicit cluster: KafkaClusterTestKit): DescribeClusterResponse = {
- val startTime = System.currentTimeMillis
- val runningBrokerServers = waitForRunningBrokers(1, waitTime)
- val remainingWaitTime = waitTime - (System.currentTimeMillis - startTime,
MILLISECONDS)
- sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(
- runningBrokerServers.head, listenerName,
- cluster.nodes.brokerNodes.size, remainingWaitTime)
- }
-
- private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
- (implicit cluster: KafkaClusterTestKit):
Seq[BrokerServer] = {
- def getRunningBrokerServers: Seq[BrokerServer] =
cluster.brokers.values.asScala.toSeq
- .filter(brokerServer => brokerServer.brokerState == BrokerState.RUNNING)
-
- val (runningBrokerServers, hasRunningBrokers) =
TestUtils.computeUntilTrue(getRunningBrokerServers,
waitTime.toMillis)(_.nonEmpty)
- assertTrue(hasRunningBrokers,
- s"After ${waitTime.toMillis} ms at least $count broker(s) should be in
RUNNING state, " +
- s"but only ${runningBrokerServers.size} broker(s) are.")
- runningBrokerServers
- }
-
- private def
sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(destination:
BrokerServer,
-
listenerName: ListenerName,
-
expectedBrokerCount: Int,
-
waitTime: FiniteDuration): DescribeClusterResponse = {
- val (describeClusterResponse, metadataUpToDate) =
TestUtils.computeUntilTrue(
- compute =
sendDescribeClusterRequestToBoundPort(destination.socketServer, listenerName),
- waitTime = waitTime.toMillis
- ) {
- response => response.nodes.size == expectedBrokerCount
- }
-
- assertTrue(metadataUpToDate,
- s"After ${waitTime.toMillis} ms Broker is only aware of
${describeClusterResponse.nodes.size} brokers, " +
- s"but $expectedBrokerCount are expected.")
-
- describeClusterResponse
- }
-
- private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
- listenerName:
ListenerName): DescribeClusterResponse = {
- connectAndReceive[DescribeClusterResponse](new
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
- destination.boundPort(listenerName))
- }
-
@Test
def testCreateClusterAndPerformReassignment(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
@@ -746,58 +397,6 @@ class KRaftClusterTest {
cluster.close()
}
}
- private def clusterImage(
- cluster: KafkaClusterTestKit,
- brokerId: Int
- ): ClusterImage = {
- cluster.brokers().get(brokerId).metadataCache.currentImage().cluster()
- }
-
- private def brokerIsUnfenced(
- image: ClusterImage,
- brokerId: Int
- ): Boolean = {
- Option(image.brokers().get(brokerId)) match {
- case None => false
- case Some(registration) => !registration.fenced()
- }
- }
-
- private def brokerIsAbsent(
- image: ClusterImage,
- brokerId: Int
- ): Boolean = {
- Option(image.brokers().get(brokerId)).isEmpty
- }
-
- @ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testUnregisterBroker(usingBootstrapController: Boolean): Unit = {
- val cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(4).
- setNumControllerNodes(3).build()).build()
- try {
- cluster.format()
- cluster.startup()
- cluster.waitForReadyBrokers()
- TestUtils.waitUntilTrue(() => brokerIsUnfenced(clusterImage(cluster, 1),
0),
- "Timed out waiting for broker 0 to be unfenced.")
- cluster.brokers().get(0).shutdown()
- TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster,
1), 0),
- "Timed out waiting for broker 0 to be fenced.")
- val admin = createAdminClient(cluster, bootstrapController =
usingBootstrapController)
- try {
- admin.unregisterBroker(0)
- } finally {
- admin.close()
- }
- TestUtils.waitUntilTrue(() => brokerIsAbsent(clusterImage(cluster, 1),
0),
- "Timed out waiting for broker 0 to be fenced.")
- } finally {
- cluster.close()
- }
- }
def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController:
Boolean): Admin = {
var props: Properties = null
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 7f051dfbffa..3286d0f185c 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -16,10 +16,16 @@
*/
package org.apache.kafka.server;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterClientQuotasResult;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Reconfigurable;
@@ -27,10 +33,19 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.message.DescribeClusterRequestData;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.requests.DescribeClusterRequest;
+import org.apache.kafka.common.requests.DescribeClusterResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.authorizer.AclCreateResult;
@@ -43,28 +58,39 @@ import
org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.quota.ClientQuotaCallback;
-import org.apache.kafka.server.quota.ClientQuotaEntity;
import org.apache.kafka.server.quota.ClientQuotaType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import static org.apache.kafka.server.IntegrationTestUtils.connectAndReceive;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+@Timeout(120)
+@Tag("integration")
public class KRaftClusterTest {
@Test
@@ -168,8 +194,8 @@ public class KRaftClusterTest {
@Test
public void testAuthorizerFailureFoundInControllerStartup() throws
Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumControllerNodes(3).build())
+ new TestKitNodes.Builder()
+ .setNumControllerNodes(3).build())
.setConfigProp("authorizer.class.name",
BadAuthorizer.class.getName())
.build()) {
cluster.format();
@@ -262,6 +288,372 @@ public class KRaftClusterTest {
});
}
+ @Test
+ public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ String testTopic = "test-topic";
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create a test topic
+ List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
+ createTopicResult.all().get();
+ waitForTopicListing(admin, List.of(testTopic), List.of());
+
+ // Delete topic
+ DeleteTopicsResult deleteResult =
admin.deleteTopics(List.of(testTopic));
+ deleteResult.all().get();
+
+ // List again
+ waitForTopicListing(admin, List.of(), List.of(testTopic));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(3)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ // Create many topics
+ List<NewTopic> newTopics = List.of(
+ new NewTopic("test-topic-1", 2, (short) 3),
+ new NewTopic("test-topic-2", 2, (short) 3),
+ new NewTopic("test-topic-3", 2, (short) 3)
+ );
+ CreateTopicsResult createTopicResult =
admin.createTopics(newTopics);
+ createTopicResult.all().get();
+
+ // List created topics
+ waitForTopicListing(admin, List.of("test-topic-1",
"test-topic-2", "test-topic-3"), List.of());
+ }
+ }
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+ Admin admin,
+ ClientQuotaEntity entity,
+ List<ClientQuotaAlteration.Op> quotas,
+ ClientQuotaFilter filter,
+ int expectCount
+ ) throws Exception {
+ AlterClientQuotasResult alterResult =
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+ alterResult.all().get();
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(filter).entities().get();
+ return results.getOrDefault(entity, Map.of()).size() ==
expectCount;
+ }, "Broker never saw new client quotas");
+
+ return admin.describeClientQuotas(filter).entities().get();
+ }
+
+ @Test
+ public void testClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
+ ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ Map<ClientQuotaEntity, Map<String, Double>> describeResult =
alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("request_percentage",
0.99)), filter, 1);
+ assertEquals(0.99,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.97),
+ new ClientQuotaAlteration.Op("producer_byte_rate",
10000.0),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+ ), filter, 3);
+ assertEquals(0.97,
describeResult.get(entity).get("request_percentage"), 1e-6);
+ assertEquals(10000.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(10001.0,
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+ describeResult = alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", 0.95),
+ new ClientQuotaAlteration.Op("producer_byte_rate", null),
+ new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+ ), filter, 1);
+ assertEquals(0.95,
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+ alterThenDescribe(admin, entity, List.of(
+ new ClientQuotaAlteration.Op("request_percentage", null)),
filter, 0);
+
+ describeResult = alterThenDescribe(admin, entity,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9999.0)), filter, 1);
+ assertEquals(9999.0,
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+ ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+ filter = ClientQuotaFilter.containsOnly(
+ List.of(
+ ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+ ClientQuotaFilterComponent.ofEntity("client-id",
"some-client")
+ ));
+ describeResult = alterThenDescribe(admin, entity2,
+ List.of(new ClientQuotaAlteration.Op("producer_byte_rate",
9998.0)), filter, 1);
+ assertEquals(9998.0,
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+ final ClientQuotaFilter finalFilter =
ClientQuotaFilter.contains(
+ List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
+
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Map<String, Double>> results =
admin.describeClientQuotas(finalFilter).entities().get();
+ if (results.size() != 2) {
+ return false;
+ }
+ assertEquals(9999.0,
results.get(entity).get("producer_byte_rate"), 1e-6);
+ assertEquals(9998.0,
results.get(entity2).get("producer_byte_rate"), 1e-6);
+ return true;
+ }, "Broker did not see two client quotas");
+ }
+ }
+ }
+
+ private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity,
Long value) throws Exception {
+ admin.alterClientQuotas(List.of(
+ new ClientQuotaAlteration(entity, List.of(
+ new ClientQuotaAlteration.Op("consumer_byte_rate",
value.doubleValue())))
+ )).all().get();
+ }
+
+ private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin)
throws Exception {
+ return
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+ .entities().get()
+ .entrySet().stream()
+ .filter(entry ->
entry.getValue().containsKey("consumer_byte_rate"))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> entry.getValue().get("consumer_byte_rate").longValue()
+ ));
+ }
+
+ @Test
+ public void testDefaultClientQuotas() throws Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(1)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+
+ try (Admin admin = Admin.create(cluster.clientProperties())) {
+ ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
+ ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
+
+ TestUtils.waitForCondition(
+ () -> getConsumerByteRates(admin).isEmpty(),
+ "Initial consumer byte rates should be empty");
+
+ setConsumerByteRate(admin, defaultUser, 100L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 1 &&
+ rates.get(defaultUser) == 100L;
+ }, "Default user rate should be 100");
+
+ setConsumerByteRate(admin, bobUser, 1000L);
+ TestUtils.waitForCondition(() -> {
+ Map<ClientQuotaEntity, Long> rates =
getConsumerByteRates(admin);
+ return rates.size() == 2 &&
+ rates.get(defaultUser) == 100L &&
+ rates.get(bobUser) == 1000L;
+ }, "Should have both default and bob user rates");
+ }
+ }
+ }
+
+ @Test
+ public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+ Map<Integer, Map<String, String>> brokerPropertyOverrides = new
HashMap<>();
+ for (int brokerId = 0; brokerId < 3; brokerId++) {
+ Map<String, String> props = new HashMap<>();
+ props.put(SocketServerConfigs.LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://localhost:0");
+ brokerPropertyOverrides.put(brokerId, props);
+ }
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("localhost", broker.host(),
+ "Did not advertise configured advertised host");
+
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
broker.port(),
+ "Did not advertise bound socket port");
+ })
+ );
+ }
+
+ @Test
+ public void
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws
Exception {
+ var brokerPropertyOverrides = IntStream.range(0,
3).boxed().collect(Collectors.toMap(brokerId -> brokerId, brokerId -> Map.of(
+ SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0",
+ SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 100)
+ )));
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setNumControllerNodes(1)
+ .setNumBrokerNodes(3)
+ .setNumDisksPerBroker(1)
+ .setPerServerProperties(brokerPropertyOverrides)
+ .build();
+
+ doOnStartedKafkaCluster(nodes, cluster ->
+
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
Duration.ofSeconds(15), cluster)
+ .nodes().values().forEach(broker -> {
+ assertEquals("advertised-host-" + broker.id(),
broker.host(), "Did not advertise configured advertised host");
+ assertEquals(broker.id() + 100, broker.port(), "Did not
advertise configured advertised port");
+ })
+ );
+ }
+
+ private void doOnStartedKafkaCluster(TestKitNodes nodes,
Consumer<KafkaClusterTestKit> action) throws Exception {
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).build()) {
+ cluster.format();
+ cluster.startup();
+ action.accept(cluster);
+ }
+ }
+
+ private DescribeClusterResponse
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+ ListenerName listenerName,
+ Duration waitTime,
+ KafkaClusterTestKit cluster
+ ) throws RuntimeException {
+ try {
+ long startTime = System.currentTimeMillis();
+ TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+ "Broker never made it to RUNNING state.");
+ TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+ "RaftManager was not initialized.");
+
+ Duration remainingWaitTime =
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+ final DescribeClusterResponse[] currentResponse = new
DescribeClusterResponse[1];
+ int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+ TestUtils.waitForCondition(
+ () -> {
+ currentResponse[0] = connectAndReceive(
+ new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
+
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+ );
+ return currentResponse[0].nodes().size() ==
expectedBrokerCount;
+ },
+ remainingWaitTime.toMillis(),
+ String.format("After %s ms Broker is only aware of %s brokers,
but %s are expected", remainingWaitTime.toMillis(), expectedBrokerCount,
expectedBrokerCount)
+ );
+ return currentResponse[0];
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void waitForTopicListing(Admin admin, List<String>
expectedPresent, List<String> expectedAbsent)
+ throws InterruptedException {
+ Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+ Set<String> extraTopics = new HashSet<>();
+ TestUtils.waitForCondition(() -> {
+ Set<String> topicNames = admin.listTopics().names().get();
+ topicsNotFound.removeAll(topicNames);
+ extraTopics.clear();
+
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));
+ return topicsNotFound.isEmpty() && extraTopics.isEmpty();
+ }, String.format("Failed to find topic(s): %s and NOT find topic(s):
%s", topicsNotFound, extraTopics));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testUnregisterBroker(boolean usingBootstrapControllers) throws
Exception {
+ try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setNumBrokerNodes(4)
+ .setNumControllerNodes(3)
+ .build()).build()) {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ TestUtils.waitForCondition(() ->
brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be unfenced.");
+ cluster.brokers().get(0).shutdown();
+ TestUtils.waitForCondition(() ->
!brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.");
+
+ try (Admin admin = createAdminClient(cluster,
usingBootstrapControllers)) {
+ admin.unregisterBroker(0);
+ }
+
+ TestUtils.waitForCondition(() ->
brokerIsAbsent(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.");
+ }
+ }
+
+ private ClusterImage clusterImage(KafkaClusterTestKit cluster, int
brokerId) {
+ return
cluster.brokers().get(brokerId).metadataCache().currentImage().cluster();
+ }
+
+ private boolean brokerIsUnfenced(ClusterImage image, int brokerId) {
+ BrokerRegistration registration = image.brokers().get(brokerId);
+ if (registration == null) {
+ return false;
+ }
+ return !registration.fenced();
+ }
+
+ private boolean brokerIsAbsent(ClusterImage image, int brokerId) {
+ return !image.brokers().containsKey(brokerId);
+ }
+
+ private Admin createAdminClient(KafkaClusterTestKit cluster, boolean
usingBootstrapControllers) {
+ Properties props = new Properties();
+ if (usingBootstrapControllers) {
+ props.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
cluster.bootstrapControllers());
+ props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ } else {
+ props = cluster.clientProperties();
+ }
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName());
+ return Admin.create(props);
+ }
+
public static class BadAuthorizer implements Authorizer {
// Default constructor needed for reflection object creation
public BadAuthorizer() {
@@ -323,11 +715,11 @@ public class KRaftClusterTest {
}
@Override
- public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity
quotaEntity, double newValue) {
+ public void updateQuota(ClientQuotaType quotaType,
org.apache.kafka.server.quota.ClientQuotaEntity quotaEntity, double newValue) {
}
@Override
- public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity
quotaEntity) {
+ public void removeQuota(ClientQuotaType quotaType,
org.apache.kafka.server.quota.ClientQuotaEntity quotaEntity) {
}
@Override