This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92f61b36f4e KAFKA-18226: Disable CustomQuotaCallbackTest and remove
isKRaftTest (#18166)
92f61b36f4e is described below
commit 92f61b36f4e91c7c9e64722a6a4789d1b830daf4
Author: Ken Huang <[email protected]>
AuthorDate: Mon Dec 16 23:46:39 2024 +0800
KAFKA-18226: Disable CustomQuotaCallbackTest and remove isKRaftTest (#18166)
Reviewers: Mickael Maison <[email protected]>
---
.../kafka/api/BaseProducerSendTest.scala | 2 +-
.../kafka/api/CustomQuotaCallbackTest.scala | 11 +-
.../kafka/api/EndToEndClusterIdTest.scala | 2 +-
.../kafka/api/IntegrationTestHarness.scala | 26 ++--
.../kafka/api/ProducerCompressionTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 2 +-
.../kafka/api/ProducerIdExpirationTest.scala | 2 +-
.../kafka/api/ProducerSendWhileDeletionTest.scala | 2 +-
.../kafka/api/TransactionsBounceTest.scala | 2 +-
.../kafka/api/TransactionsExpirationTest.scala | 2 +-
.../api/TransactionsWithMaxInFlightOneTest.scala | 2 +-
.../server/FetchFromFollowerIntegrationTest.scala | 2 +-
.../kafka/server/QuorumTestHarness.scala | 141 +-------------------
.../src/test/scala/kafka/utils/TestInfoUtils.scala | 15 +--
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 5 +-
.../kafka/integration/KafkaServerTestHarness.scala | 148 +++++----------------
.../MetricsDuringTopicCreationDeletionTest.scala | 4 +-
.../unit/kafka/integration/MinIsrConfigTest.scala | 2 +-
.../integration/UncleanLeaderElectionTest.scala | 4 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 4 +-
.../scala/unit/kafka/server/BaseRequestTest.scala | 11 +-
...leteTopicsRequestWithDeletionDisabledTest.scala | 2 +-
.../unit/kafka/server/EdgeCaseRequestTest.scala | 2 +-
.../kafka/server/KafkaMetricsReporterTest.scala | 2 +-
.../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +-
.../unit/kafka/server/ReplicationQuotasTest.scala | 8 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 8 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 71 ----------
28 files changed, 87 insertions(+), 399 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 99aefe0e51b..c8c36730f2c 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -55,7 +55,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 4.toString)
TestUtils.createBrokerConfigs(
numServers,
- zkConnectOrNull,
+ null,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile,
saslProperties = serverSaslProperties
diff --git
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index 4141342c7a9..a18c03fb593 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -19,7 +19,6 @@ import kafka.api.GroupedUserQuotaCallback._
import kafka.security.{JaasModule, JaasTestUtils}
import kafka.server._
import kafka.utils.{Logging, TestInfoUtils, TestUtils}
-import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
@@ -31,7 +30,7 @@ import org.apache.kafka.common.{Cluster, Reconfigurable}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs}
import org.apache.kafka.server.quota._
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -42,6 +41,7 @@ import java.{lang, util}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
+@Disabled("KAFKA-18213")
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -86,8 +86,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness
with SaslSetup {
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
super.configureSecurityBeforeServersStart(testInfo)
-
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN,
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
+ createScramCredentials("", JaasTestUtils.KAFKA_SCRAM_ADMIN,
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@@ -148,7 +147,7 @@ class CustomQuotaCallbackTest extends
IntegrationTestHarness with SaslSetup {
user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle =
true)
// Remove the second topic with large number of partitions, verify no
longer throttled
- adminZkClient.deleteTopic(largeTopic)
+ deleteTopic(largeTopic)
user = addUser("group1_user3", brokerId)
user.waitForQuotaUpdate(8000 * 100, 2500 * 100, defaultRequestQuota)
user.removeThrottleMetrics() // since group was throttled before
@@ -180,7 +179,7 @@ class CustomQuotaCallbackTest extends
IntegrationTestHarness with SaslSetup {
private def createTopic(topic: String, numPartitions: Int, leader: Int):
Unit = {
val assignment = (0 until numPartitions).map { i => i -> Seq(leader)
}.toMap
- TestUtils.createTopic(zkClient, topic, assignment, servers)
+ TestUtils.createTopic(null, topic, assignment, servers)
}
private def createAdminClient(): Admin = {
diff --git
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 6c8d119daee..0cc927bd11d 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -102,7 +102,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
this.serverConfig.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG,
classOf[MockBrokerMetricsReporter].getName)
override def generateConfigs = {
- val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnectOrNull,
interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(serverCount, null,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_ ++= serverConfig)
cfgs.map(KafkaConfig.fromProps)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b7efed1d495..b660ff35219 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -66,7 +66,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
override def generateConfigs: Seq[KafkaConfig] = {
- val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull,
interBrokerSecurityProtocol = Some(securityProtocol),
+ val cfgs = TestUtils.createBrokerConfigs(brokerCount, null,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties,
logDirCount = logDirCount)
configureListeners(cfgs)
modifyConfigs(cfgs)
@@ -74,11 +74,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
"classic,consumer,share"))
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG,
"true"))
}
-
- if(isKRaftTest()) {
- cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
TestUtils.tempDir().getAbsolutePath))
- }
-
+ cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG,
TestUtils.tempDir().getAbsolutePath))
insertControllerListenersIfNeeded(cfgs)
cfgs.map(KafkaConfig.fromProps)
}
@@ -103,16 +99,14 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
private def insertControllerListenersIfNeeded(props: Seq[Properties]): Unit
= {
- if (isKRaftTest()) {
- props.foreach { config =>
- // Add a security protocol for the controller endpoints, if one is not
already set.
- val securityPairs =
config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"").split(",")
- val toAdd =
config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"").split(",").filter(
- e => !securityPairs.exists(_.startsWith(s"$e:")))
- if (toAdd.nonEmpty) {
-
config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
(securityPairs ++
- toAdd.map(e =>
s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(","))
- }
+ props.foreach { config =>
+ // Add a security protocol for the controller endpoints, if one is not
already set.
+ val securityPairs =
config.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"").split(",")
+ val toAdd =
config.getProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"").split(",").filter(
+ e => !securityPairs.exists(_.startsWith(s"$e:")))
+ if (toAdd.nonEmpty) {
+
config.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
(securityPairs ++
+ toAdd.map(e =>
s"$e:${controllerListenerSecurityProtocol.toString}")).mkString(","))
}
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 521b9cc0a0f..31375752892 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -48,7 +48,7 @@ class ProducerCompressionTest extends QuorumTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- val props = TestUtils.createBrokerConfig(brokerId, zkConnectOrNull)
+ val props = TestUtils.createBrokerConfig(brokerId, null)
broker = createBroker(new KafkaConfig(props))
}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index a30d440f325..92eac4f1230 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -52,7 +52,7 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
1.toString)
def generateConfigs =
- TestUtils.createBrokerConfigs(numServers, zkConnectOrNull,
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+ TestUtils.createBrokerConfigs(numServers, null, enableControlledShutdown =
false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index 918d79b436e..2dee826dc9f 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -52,7 +52,7 @@ class ProducerIdExpirationTest extends KafkaServerTestHarness
{
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(3,
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+ TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_,
serverProps()))
}
@BeforeEach
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
index 397e4660da7..8d3e76e7448 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -78,7 +78,7 @@ class ProducerSendWhileDeletionTest extends
IntegrationTestHarness {
deleteTopic(topic, listenerName)
// Verify that the topic is deleted when no metadata request comes in
- TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 2, brokers)
+ TestUtils.verifyTopicDeletion(topic, 2, brokers)
// Producer should be able to send messages even after topic gets deleted
and auto-created
assertEquals(topic, producer.send(new ProducerRecord(topic, null,
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
index 1d1eca60ead..4f6fb7e483c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -69,7 +69,7 @@ class TransactionsBounceTest extends IntegrationTestHarness {
// Since such quick rotation of servers is incredibly unrealistic, we allow
this one test to preallocate ports, leaving
// a small risk of hitting errors due to port conflicts. Hopefully this is
infrequent enough to not cause problems.
override def generateConfigs = {
- FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull)
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, null)
.map(KafkaConfig.fromProps(_, overridingProps))
}
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index b0e291c5efc..26ed880aa1a 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -51,7 +51,7 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
var admin: Admin = _
override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(3,
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+ TestUtils.createBrokerConfigs(3, null).map(KafkaConfig.fromProps(_,
serverProps()))
}
@BeforeEach
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
index 53899a43743..fe1ea323162 100644
---
a/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
+++
b/core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala
@@ -50,7 +50,7 @@ class TransactionsWithMaxInFlightOneTest extends
KafkaServerTestHarness {
val transactionalConsumers = mutable.Buffer[Consumer[Array[Byte],
Array[Byte]]]()
override def generateConfigs: Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(numBrokers,
zkConnectOrNull).map(KafkaConfig.fromProps(_, serverProps()))
+ TestUtils.createBrokerConfigs(numBrokers,
null).map(KafkaConfig.fromProps(_, serverProps()))
}
@BeforeEach
diff --git
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
index 8a8013921db..31f3663359e 100644
---
a/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala
@@ -52,7 +52,7 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
}
override def generateConfigs: collection.Seq[KafkaConfig] = {
- TestUtils.createBrokerConfigs(numNodes, zkConnectOrNull,
enableControlledShutdown = false, enableFetchFromFollower = true)
+ TestUtils.createBrokerConfigs(numNodes, null, enableControlledShutdown =
false, enableFetchFromFollower = true)
.map(KafkaConfig.fromProps(_, overridingProps))
}
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index ce953990af8..9c9067ef113 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -26,7 +26,6 @@ import java.util.{Collections, Locale, Optional, OptionalInt,
Properties, stream
import java.util.concurrent.{CompletableFuture, TimeUnit}
import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
-import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.clients.consumer.GroupProtocol
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
@@ -34,7 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{Exit, Time, Utils}
+import org.apache.kafka.common.utils.{Exit, Time}
import org.apache.kafka.common.{DirectoryId, Uuid}
import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion}
@@ -47,8 +46,6 @@ import
org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVe
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
-import org.apache.zookeeper.client.ZKClientConfig
-import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag,
TestInfo}
import org.junit.jupiter.params.provider.Arguments
@@ -69,30 +66,6 @@ trait QuorumImplementation {
def shutdown(): Unit
}
-class ZooKeeperQuorumImplementation(
- val zookeeper: EmbeddedZookeeper,
- val zkConnect: String,
- val zkClient: KafkaZkClient,
- val adminZkClient: AdminZkClient,
- val log: Logging
-) extends QuorumImplementation {
- override def createBroker(
- config: KafkaConfig,
- time: Time,
- startup: Boolean,
- threadNamePrefix: Option[String],
- ): KafkaBroker = {
- val server = new KafkaServer(config, time, threadNamePrefix)
- if (startup) server.startup()
- server
- }
-
- override def shutdown(): Unit = {
- Utils.closeQuietly(zkClient, "zk client")
- CoreUtils.swallow(zookeeper.shutdown(), log)
- }
-}
-
class KRaftQuorumImplementation(
val controllerServer: ControllerServer,
val faultHandlerFactory: FaultHandlerFactory,
@@ -172,11 +145,6 @@ class QuorumTestHarnessFaultHandlerFactory(
@Tag("integration")
abstract class QuorumTestHarness extends Logging {
- val zkConnectionTimeout = 10000
- val zkSessionTimeout = 15000 // Allows us to avoid ZK session expiration due
to GC up to 2/3 * 15000ms = 10 secs
- val zkMaxInFlightRequests = Int.MaxValue
-
- protected def zkAclsEnabled: Option[Boolean] = None
/**
* When in KRaft mode, the security protocol to use for the controller
listener.
@@ -193,10 +161,6 @@ abstract class QuorumTestHarness extends Logging {
private var testInfo: TestInfo = _
protected var implementation: QuorumImplementation = _
- def isKRaftTest(): Boolean = {
- TestInfoUtils.isKRaft(testInfo)
- }
-
def isShareGroupTest(): Boolean = {
TestInfoUtils.isShareGroupTest(testInfo)
}
@@ -214,53 +178,11 @@ abstract class QuorumTestHarness extends Logging {
gp.get
}
- def checkIsZKTest(): Unit = {
- if (isKRaftTest()) {
- throw new RuntimeException("This function can't be accessed when running
the test " +
- "in KRaft mode. ZooKeeper mode is required.")
- }
- }
-
- def checkIsKRaftTest(): Unit = {
- if (!isKRaftTest()) {
- throw new RuntimeException("This function can't be accessed when running
the test " +
- "in ZooKeeper mode. KRaft mode is required.")
- }
- }
-
- private def asZk(): ZooKeeperQuorumImplementation = {
- checkIsZKTest()
- implementation.asInstanceOf[ZooKeeperQuorumImplementation]
- }
-
- private def asKRaft(): KRaftQuorumImplementation = {
- checkIsKRaftTest()
- implementation.asInstanceOf[KRaftQuorumImplementation]
- }
-
- def zookeeper: EmbeddedZookeeper = asZk().zookeeper
-
- def zkClient: KafkaZkClient = asZk().zkClient
-
- def zkClientOrNull: KafkaZkClient = if (isKRaftTest()) null else
asZk().zkClient
-
- def adminZkClient: AdminZkClient = asZk().adminZkClient
-
- def zkPort: Int = asZk().zookeeper.port
-
- def zkConnect: String = s"127.0.0.1:$zkPort"
-
- def zkConnectOrNull: String = if (isKRaftTest()) null else zkConnect
+ private def asKRaft(): KRaftQuorumImplementation =
implementation.asInstanceOf[KRaftQuorumImplementation]
def controllerServer: ControllerServer = asKRaft().controllerServer
- def controllerServers: Seq[ControllerServer] = {
- if (isKRaftTest()) {
- Seq(asKRaft().controllerServer)
- } else {
- Seq()
- }
- }
+ def controllerServers: Seq[ControllerServer] =
Seq(asKRaft().controllerServer)
val faultHandlerFactory = new QuorumTestHarnessFaultHandlerFactory(new
MockFaultHandler("quorumTestHarnessFaultHandler"))
@@ -297,13 +219,8 @@ abstract class QuorumTestHarness extends Logging {
val name = testInfo.getTestMethod.toScala
.map(_.toString)
.getOrElse("[unspecified]")
- if (TestInfoUtils.isKRaft(testInfo)) {
- info(s"Running KRAFT test $name")
- implementation = newKRaftQuorum(testInfo)
- } else {
- info(s"Running ZK test $name")
- implementation = newZooKeeperQuorum()
- }
+ info(s"Running KRAFT test $name")
+ implementation = newKRaftQuorum(testInfo)
}
def createBroker(
@@ -315,8 +232,6 @@ abstract class QuorumTestHarness extends Logging {
implementation.createBroker(config, time, startup, threadNamePrefix)
}
- def shutdownZooKeeper(): Unit = asZk().shutdown()
-
def shutdownKRaftController(): Unit = {
// Note that the RaftManager instance is left running; it will be shut
down in tearDown()
val kRaftQuorumImplementation = asKRaft()
@@ -438,38 +353,6 @@ abstract class QuorumTestHarness extends Logging {
)
}
- private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
- val zookeeper = new EmbeddedZookeeper()
- var zkClient: KafkaZkClient = null
- var adminZkClient: AdminZkClient = null
- val zkConnect = s"127.0.0.1:${zookeeper.port}"
- try {
- zkClient = KafkaZkClient(
- zkConnect,
- zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
- zkSessionTimeout,
- zkConnectionTimeout,
- zkMaxInFlightRequests,
- Time.SYSTEM,
- name = "ZooKeeperTestHarness",
- new ZKClientConfig,
- enableEntityConfigControllerCheck = false)
- adminZkClient = new AdminZkClient(zkClient)
- } catch {
- case t: Throwable =>
- CoreUtils.swallow(zookeeper.shutdown(), this)
- Utils.closeQuietly(zkClient, "zk client")
- throw t
- }
- new ZooKeeperQuorumImplementation(
- zookeeper,
- zkConnect,
- zkClient,
- adminZkClient,
- this
- )
- }
-
@AfterEach
def tearDown(): Unit = {
if (implementation != null) {
@@ -482,22 +365,9 @@ abstract class QuorumTestHarness extends Logging {
Configuration.setConfiguration(null)
faultHandler.maybeRethrowFirstException()
}
-
- // Trigger session expiry by reusing the session id in another client
- def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper):
ZooKeeper = {
- val dummyWatcher = new Watcher {
- override def process(event: WatchedEvent): Unit = {}
- }
- val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
- zooKeeper.getSessionId,
- zooKeeper.getSessionPasswd)
- assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new
client works
- anotherZkClient
- }
}
object QuorumTestHarness {
- val ZkClientEventThreadSuffix = "-EventThread"
/**
* Verify that a previous test that doesn't use QuorumTestHarness hasn't
left behind an unexpected thread.
@@ -527,7 +397,6 @@ object QuorumTestHarness {
KafkaProducer.NETWORK_THREAD_PREFIX,
AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
- QuorumTestHarness.ZkClientEventThreadSuffix,
KafkaEventQueue.EVENT_HANDLER_THREAD_SUFFIX,
ClientMetricsManager.CLIENT_METRICS_REAPER_THREAD_NAME,
SystemTimer.SYSTEM_TIMER_THREAD_PREFIX,
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index cd22727839e..a74d1ca1612 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -31,20 +31,7 @@ class EmptyTestInfo extends TestInfo {
}
object TestInfoUtils {
- def isKRaft(testInfo: TestInfo): Boolean = {
- if (testInfo.getDisplayName.contains("quorum=")) {
- if (testInfo.getDisplayName.contains("quorum=kraft")) {
- true
- } else if (testInfo.getDisplayName.contains("quorum=zk")) {
- false
- } else {
- throw new RuntimeException(s"Unknown quorum value")
- }
- } else {
- false
- }
- }
-
+
final val TestWithParameterizedQuorumAndGroupProtocolNames =
"{displayName}.quorum={0}.groupProtocol={1}"
def isShareGroupTest(testInfo: TestInfo): Boolean = {
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 384d067e2b6..a9901ba65e7 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -57,10 +57,7 @@ class AddPartitionsTest extends BaseRequestTest {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
-
- if (isKRaftTest()) {
- brokers.foreach(broker =>
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
- }
+ brokers.foreach(broker =>
broker.asInstanceOf[BrokerServer].lifecycleManager.initialUnfenceFuture.get())
createTopicWithAssignment(topic1, partitionReplicaAssignment =
topic1Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic2, partitionReplicaAssignment =
topic2Assignment.map { case (k, v) => k -> v.replicas })
createTopicWithAssignment(topic3, partitionReplicaAssignment =
topic3Assignment.map { case (k, v) => k -> v.replicas })
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 162b14760fd..0fbf0143748 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -20,10 +20,7 @@ package kafka.integration
import kafka.server._
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
-import kafka.zk.KafkaZkClient
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter}
-import org.apache.kafka.common.errors.TopicExistsException
-import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.resource.ResourcePattern
@@ -60,10 +57,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
*/
def servers: mutable.Buffer[KafkaBroker] = brokers
- def brokerServers: mutable.Buffer[BrokerServer] = {
- checkIsKRaftTest()
- _brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
- }
+ def brokerServers: mutable.Buffer[BrokerServer] =
_brokers.asInstanceOf[mutable.Buffer[BrokerServer]]
var alive: Array[Boolean] = _
@@ -155,12 +149,8 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
listenerName: ListenerName = listenerName,
adminClientConfig: Properties = new Properties
): Unit = {
- if (isKRaftTest()) {
- Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
- TestUtils.createOffsetsTopicWithAdmin(admin, brokers,
controllerServers)
- }
- } else {
- createOffsetsTopic(zkClient, servers)
+ Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
+ TestUtils.createOffsetsTopicWithAdmin(admin, brokers, controllerServers)
}
}
@@ -177,25 +167,14 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
listenerName: ListenerName = listenerName,
adminClientConfig: Properties = new Properties
): scala.collection.immutable.Map[Int, Int] = {
- if (isKRaftTest()) {
- Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
- TestUtils.createTopicWithAdmin(
- admin = admin,
- topic = topic,
- brokers = brokers,
- controllers = controllerServers,
- numPartitions = numPartitions,
- replicationFactor = replicationFactor,
- topicConfig = topicConfig
- )
- }
- } else {
- TestUtils.createTopic(
- zkClient = zkClient,
+ Using.resource(createAdminClient(brokers, listenerName,
adminClientConfig)) { admin =>
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
topic = topic,
+ brokers = brokers,
+ controllers = controllerServers,
numPartitions = numPartitions,
replicationFactor = replicationFactor,
- servers = servers,
topicConfig = topicConfig
)
}
@@ -210,40 +189,28 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
listenerName: ListenerName = listenerName
- ): scala.collection.immutable.Map[Int, Int] =
- if (isKRaftTest()) {
- Using.resource(createAdminClient(brokers, listenerName)) { admin =>
- TestUtils.createTopicWithAdmin(
- admin = admin,
- topic = topic,
- replicaAssignment = partitionReplicaAssignment,
- brokers = brokers,
- controllers = controllerServers
- )
- }
- } else {
- TestUtils.createTopic(
- zkClient,
- topic,
- partitionReplicaAssignment,
- servers
+ ): scala.collection.immutable.Map[Int, Int] = {
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ topic = topic,
+ replicaAssignment = partitionReplicaAssignment,
+ brokers = brokers,
+ controllers = controllerServers
)
}
+ }
def deleteTopic(
topic: String,
listenerName: ListenerName = listenerName
): Unit = {
- if (isKRaftTest()) {
- Using.resource(createAdminClient(brokers, listenerName)) { admin =>
- TestUtils.deleteTopicWithAdmin(
- admin = admin,
- topic = topic,
- brokers = aliveBrokers,
- controllers = controllerServers)
- }
- } else {
- adminZkClient.deleteTopic(topic)
+ Using.resource(createAdminClient(brokers, listenerName)) { admin =>
+ TestUtils.deleteTopicWithAdmin(
+ admin = admin,
+ topic = topic,
+ brokers = aliveBrokers,
+ controllers = controllerServers)
}
}
@@ -380,11 +347,7 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
}
private def createBrokerFromConfig(config: KafkaConfig): KafkaBroker = {
- if (isKRaftTest()) {
- createBroker(config, brokerTime(config.brokerId), startup = false)
- } else {
- TestUtils.createServer(config, time = brokerTime(config.brokerId),
threadNamePrefix = None, startup = false)
- }
+ createBroker(config, brokerTime(config.brokerId), startup = false)
}
def aliveBrokers: Seq[KafkaBroker] = {
@@ -392,63 +355,20 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
}
def ensureConsistentKRaftMetadata(): Unit = {
- if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(
- aliveBrokers,
- controllerServer
- )
- }
+ TestUtils.ensureConsistentKRaftMetadata(
+ aliveBrokers,
+ controllerServer
+ )
}
def changeClientIdConfig(sanitizedClientId: String, configs: Properties):
Unit = {
- if (isKRaftTest()) {
- Using.resource(createAdminClient(brokers, listenerName)) {
- admin => {
- admin.alterClientQuotas(Collections.singleton(
- new ClientQuotaAlteration(
- new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
- configs.asScala.map { case (key, value) => new
ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()
- }
+ Using.resource(createAdminClient(brokers, listenerName)) {
+ admin => {
+ admin.alterClientQuotas(Collections.singleton(
+ new ClientQuotaAlteration(
+ new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> (if
(sanitizedClientId == "<default>") null else sanitizedClientId)).asJava),
+ configs.asScala.map { case (key, value) => new
ClientQuotaAlteration.Op(key, value.toDouble) }.toList.asJava))).all().get()
}
}
- else {
- adminZkClient.changeClientIdConfig(sanitizedClientId, configs)
- }
- }
-
- /**
- * Ensures that the consumer offsets/group metadata topic exists. If it does
not, the topic is created and the method waits
- * until the leader is elected and metadata is propagated to all brokers. If
it does, the method verifies that it has
- * the expected number of partition and replication factor however it does
not guarantee that the topic is empty.
- */
- private def createOffsetsTopic(zkClient: KafkaZkClient, servers:
Seq[KafkaBroker]): Unit = {
- val server = servers.head
- val numPartitions =
server.config.groupCoordinatorConfig.offsetsTopicPartitions
- val replicationFactor =
server.config.groupCoordinatorConfig.offsetsTopicReplicationFactor.toInt
-
- try {
- TestUtils.createTopic(
- zkClient,
- Topic.GROUP_METADATA_TOPIC_NAME,
- numPartitions,
- replicationFactor,
- servers,
- server.groupCoordinator.groupMetadataTopicConfigs
- )
- } catch {
- case ex: TopicExistsException =>
- val allPartitionsMetadata = waitForAllPartitionsMetadata(
- servers,
- Topic.GROUP_METADATA_TOPIC_NAME,
- numPartitions
- )
-
- // If the topic already exists, we ensure that it has the required
- // number of partitions and replication factor. If it has not, the
- // exception is thrown further.
- if (allPartitionsMetadata.size != numPartitions ||
allPartitionsMetadata.head._2.replicas.size != replicationFactor) {
- throw ex
- }
- }
}
}
diff --git
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 01d96ef6318..10a32c4a3c1 100644
---
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -51,7 +51,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
@volatile private var running = true
- override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum,
zkConnectOrNull)
+ override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, null)
.map(KafkaConfig.fromProps(_, overridingProps))
@BeforeEach
@@ -148,7 +148,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
for (t <- topics if running) {
try {
deleteTopic(t)
- TestUtils.verifyTopicDeletion(null, t, partitionNum, servers)
+ TestUtils.verifyTopicDeletion(t, partitionNum, servers)
} catch {
case e: Exception => e.printStackTrace()
}
diff --git a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
index 0b7beeaab7a..4af4a49069f 100644
--- a/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MinIsrConfigTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.params.provider.ValueSource
class MinIsrConfigTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
overridingProps.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, "5")
- def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).map(KafkaConfig.fromProps(_, overridingProps))
+ def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1,
null).map(KafkaConfig.fromProps(_, overridingProps))
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
diff --git
a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 3d78bf27aff..29714d1856b 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -71,8 +71,8 @@ class UncleanLeaderElectionTest extends QuorumTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- configProps1 = createBrokerConfig(brokerId1, zkConnectOrNull)
- configProps2 = createBrokerConfig(brokerId2, zkConnectOrNull)
+ configProps1 = createBrokerConfig(brokerId1, null)
+ configProps2 = createBrokerConfig(brokerId2, null)
for (configProps <- List(configProps1, configProps2)) {
configProps.put("controlled.shutdown.enable",
enableControlledShutdown.toString)
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 1fa12ef990b..451b8cf2f87 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -62,7 +62,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
val topic = "test-topic-metric"
createTopic(topic)
deleteTopic(topic)
- TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
+ TestUtils.verifyTopicDeletion(topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists
after deleteTopic")
}
@@ -77,7 +77,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging
{
assertTrue(topicMetricGroups(topic).nonEmpty, "Topic metrics don't exist")
brokers.foreach(b => assertNotNull(b.brokerTopicStats.topicStats(topic)))
deleteTopic(topic)
- TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
+ TestUtils.verifyTopicDeletion(topic, 1, brokers)
assertEquals(Set.empty, topicMetricGroups(topic), "Topic metrics exists
after deleteTopic")
}
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 95a9f92fe86..3a0ffe1b477 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -69,16 +69,9 @@ abstract class BaseRequestTest extends
IntegrationTestHarness {
/**
* Return the socket server where admin request to be sent.
*
- * For KRaft clusters that is any broker as the broker will forward the
request to the active
- * controller. For Legacy clusters that is the controller broker.
+ * KRaft clusters that is any broker as the broker will forward the request
to the active controller.
*/
- def adminSocketServer: SocketServer = {
- if (isKRaftTest()) {
- anySocketServer
- } else {
- controllerSocketServer
- }
- }
+ def adminSocketServer: SocketServer = anySocketServer
def connect(socketServer: SocketServer = anySocketServer,
listenerName: ListenerName = listenerName): Socket = {
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index ec7cdffa1ef..81875ceaac2 100644
---
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -40,7 +40,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends
BaseRequestTest {
}
override def generateConfigs = {
- val props = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull,
+ val props = TestUtils.createBrokerConfigs(brokerCount, null,
enableControlledShutdown = false, enableDeleteTopic = false,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties,
logDirCount = logDirCount)
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index f2be521fca4..0c9cab50d46 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._
class EdgeCaseRequestTest extends KafkaServerTestHarness {
def generateConfigs = {
- val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
+ val props = TestUtils.createBrokerConfig(1, null)
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
"false")
List(KafkaConfig.fromProps(props))
}
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index a97cea67c5c..d319a18bc82 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -71,7 +71,7 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- val props = TestUtils.createBrokerConfig(1, zkConnectOrNull)
+ val props = TestUtils.createBrokerConfig(1, null)
props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG,
"kafka.server.KafkaMetricsReporterTest$MockMetricsReporter")
props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 156fa60a1d5..baf51347cb2 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -82,7 +82,7 @@ class LogRecoveryTest extends QuorumTestHarness {
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- configs = TestUtils.createBrokerConfigs(2, zkConnectOrNull,
enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps))
+ configs = TestUtils.createBrokerConfigs(2, null, enableControlledShutdown
= false).map(KafkaConfig.fromProps(_, overridingProps))
// start both servers
server1 = createBroker(configProps1)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 34720797e11..fbd6f75a6a9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -88,7 +88,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
* regular replication works as expected.
*/
- brokers = (100 to 105).map { id =>
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+ brokers = (100 to 105).map { id =>
createBroker(fromProps(createBrokerConfig(id, null))) }
//Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on
node 6,7 (not started yet)
//And two extra partitions 6,7, which we don't intend on throttling.
@@ -202,7 +202,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
*/
//2 brokers with 1MB Segment Size & 1 partition
- val config: Properties = createBrokerConfig(100, zkConnectOrNull)
+ val config: Properties = createBrokerConfig(100, null)
config.put("log.segment.bytes", (1024 * 1024).toString)
brokers = Seq(createBroker(fromProps(config)))
@@ -231,7 +231,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
//Start the new broker (and hence start replicating)
debug("Starting new broker")
- brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101,
zkConnectOrNull)))
+ brokers = brokers :+ createBroker(fromProps(createBrokerConfig(101, null)))
val start = System.currentTimeMillis()
waitForOffsetsToMatch(msgCount, 0, 101)
@@ -261,7 +261,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
def createBrokers(brokerIds: Seq[Int]): Unit = {
brokerIds.foreach { id =>
- brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id,
zkConnectOrNull)))
+ brokers = brokers :+ createBroker(fromProps(createBrokerConfig(id,
null)))
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 4ac571f452a..7c7e7fd6e09 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -68,7 +68,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader(quorum:
String): Unit = {
- brokers ++= (0 to 1).map { id =>
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+ brokers ++= (0 to 1).map { id =>
createBroker(fromProps(createBrokerConfig(id, null))) }
// Given two topics with replication of a single partition
for (topic <- List(topic1, topic2)) {
@@ -103,7 +103,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
def shouldSendLeaderEpochRequestAndGetAResponse(quorum: String): Unit = {
//3 brokers, put partition on 100/101 and then pretend to be 102
- brokers ++= (100 to 102).map { id =>
createBroker(fromProps(createBrokerConfig(id, zkConnectOrNull))) }
+ brokers ++= (100 to 102).map { id =>
createBroker(fromProps(createBrokerConfig(id, null))) }
val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101))
createTopic(topic1, assignment1)
@@ -150,10 +150,10 @@ class LeaderEpochIntegrationTest extends
QuorumTestHarness with Logging {
@ValueSource(strings = Array("kraft"))
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
//Setup: we are only interested in the single partition on broker 101
- brokers += createBroker(fromProps(createBrokerConfig(100,
zkConnectOrNull)))
+ brokers += createBroker(fromProps(createBrokerConfig(100, null)))
assertEquals(controllerServer.config.nodeId,
waitUntilQuorumLeaderElected(controllerServer))
- brokers += createBroker(fromProps(createBrokerConfig(101,
zkConnectOrNull)))
+ brokers += createBroker(fromProps(createBrokerConfig(101, null)))
def leo() = brokers(1).replicaManager.localLog(tp).get.logEndOffset
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b2447e7f7c5..379ba5a532f 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -526,39 +526,6 @@ object TestUtils extends Logging {
controllers.foreach(controller => ensureConsistentKRaftMetadata(brokers,
controller))
}
- /**
- * Create a topic in ZooKeeper.
- * Wait until the leader is elected and the metadata is propagated to all
brokers.
- * Return the leader for each partition.
- */
- def createTopic(zkClient: KafkaZkClient,
- topic: String,
- numPartitions: Int = 1,
- replicationFactor: Int = 1,
- servers: Seq[KafkaBroker],
- topicConfig: Properties = new Properties):
scala.collection.immutable.Map[Int, Int] = {
- val adminZkClient = new AdminZkClient(zkClient)
- // create topic
- waitUntilTrue( () => {
- var hasSessionExpirationException = false
- try {
- adminZkClient.createTopic(topic, numPartitions, replicationFactor,
topicConfig)
- } catch {
- case _: SessionExpiredException => hasSessionExpirationException = true
- case e: Throwable => throw e // let other exceptions propagate
- }
- !hasSessionExpirationException},
- s"Can't create topic $topic")
-
- // wait until we've propagated all partitions metadata to all servers
- val allPartitionsMetadata = waitForAllPartitionsMetadata(servers, topic,
numPartitions)
-
- (0 until numPartitions).map { i =>
- i -> allPartitionsMetadata.get(new TopicPartition(topic,
i)).map(_.leader()).getOrElse(
- throw new IllegalStateException(s"Cannot get the partition leader for
topic: $topic, partition: $i in server metadata cache"))
- }.toMap
- }
-
/**
* Create a topic in ZooKeeper using a customized replica assignment.
* Wait until the leader is elected and the metadata is propagated to all
brokers.
@@ -698,31 +665,6 @@ object TestUtils extends Logging {
new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
}
- /**
- * If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the
leader of a partition is elected.
- * If oldLeaderOpt is defined, it waits until the new leader is different
from the old leader.
- * If newLeaderOpt is defined, it waits until the new leader becomes the
expected new leader.
- *
- * @return The new leader (note that negative values are used to indicate
conditions like NoLeader and
- * LeaderDuringDelete).
- * @throws AssertionError if the expected condition is not true within the
timeout.
- */
- def waitUntilLeaderIsElectedOrChanged(
- zkClient: KafkaZkClient,
- topic: String,
- partition: Int,
- timeoutMs: Long = 30000L,
- oldLeaderOpt: Option[Int] = None,
- newLeaderOpt: Option[Int] = None,
- ignoreNoLeader: Boolean = false
- ): Int = {
- def getPartitionLeader(topic: String, partition: Int): Option[Int] = {
- zkClient.getLeaderForPartition(new TopicPartition(topic, partition))
- .filter(p => !ignoreNoLeader || p != LeaderAndIsr.NO_LEADER)
- }
- doWaitUntilLeaderIsElectedOrChanged(getPartitionLeader, topic, partition,
timeoutMs, oldLeaderOpt, newLeaderOpt)
- }
-
/**
* If neither oldLeaderOpt nor newLeaderOpt is defined, wait until the
leader of a partition is elected.
* If oldLeaderOpt is defined, it waits until the new leader is different
from the old leader.
@@ -1033,11 +975,6 @@ object TestUtils extends Logging {
}, msg)
}
- def waitUntilControllerElected(zkClient: KafkaZkClient, timeout: Long =
JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
- val (controllerId, _) = computeUntilTrue(zkClient.getControllerId,
waitTime = timeout)(_.isDefined)
- controllerId.getOrElse(throw new AssertionError(s"Controller not elected
after $timeout ms"))
- }
-
def awaitLeaderChange[B <: KafkaBroker](
brokers: Seq[B],
tp: TopicPartition,
@@ -1240,18 +1177,10 @@ object TestUtils extends Logging {
}
def verifyTopicDeletion[B <: KafkaBroker](
- zkClient: KafkaZkClient,
topic: String,
numPartitions: Int,
brokers: Seq[B]): Unit = {
val topicPartitions = (0 until numPartitions).map(new
TopicPartition(topic, _))
- if (zkClient != null) {
- // wait until admin path for delete topic is deleted, signaling
completion of topic deletion
- waitUntilTrue(() => !zkClient.isTopicMarkedForDeletion(topic),
- "Admin path /admin/delete_topics/%s path not deleted even after a
replica is restarted".format(topic))
- waitUntilTrue(() => !zkClient.topicExists(topic),
- "Topic path /brokers/topics/%s not deleted after
/admin/delete_topics/%s path is deleted".format(topic, topic))
- }
// ensure that the topic-partition has been deleted from all brokers'
replica managers
waitUntilTrue(() =>
brokers.forall(broker => topicPartitions.forall(tp =>
broker.replicaManager.onlinePartition(tp).isEmpty)),