This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 65820acad27 MINOR: disable some rebootstrap tests, convert the others
to KRaft (#17765)
65820acad27 is described below
commit 65820acad27908d32ca3797bf1fe4477bf74629c
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 12 09:59:20 2024 -0800
MINOR: disable some rebootstrap tests, convert the others to KRaft (#17765)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/api/AdminClientRebootstrapTest.scala | 2 +-
.../kafka/api/ConsumerRebootstrapTest.scala | 9 ++--
.../kafka/api/CustomQuotaCallbackTest.scala | 2 +-
.../kafka/api/ProducerRebootstrapTest.scala | 4 +-
.../integration/kafka/api/RebootstrapTest.scala | 8 ++--
.../kafka/integration/KafkaServerTestHarness.scala | 53 ++++++----------------
.../scala/unit/kafka/server/BaseRequestTest.scala | 20 +-------
.../unit/kafka/server/LogDirFailureTest.scala | 32 +++++--------
8 files changed, 42 insertions(+), 88 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
index d9cc326ff94..64cc259408e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -20,7 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class AdminClientRebootstrapTest extends RebootstrapTest {
- @ParameterizedTest
+ @ParameterizedTest(name =
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index c7c87ce3b84..5d6622799fe 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -17,11 +17,12 @@
package kafka.api
import kafka.api.ConsumerRebootstrapTest._
-import
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+import
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class ConsumerRebootstrapTest extends RebootstrapTest {
+ @Disabled("KAFKA-17986")
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrap(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
@@ -84,6 +86,7 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20,
startingTimestamp = 20)
}
+ @Disabled
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrapDisabled(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
@@ -133,8 +136,8 @@ object ConsumerRebootstrapTest {
final val RebootstrapTestName =
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
def rebootstrapTestParams: stream.Stream[Arguments] = {
- assertEquals(1,
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count())
- val args =
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+ assertEquals(1,
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly.count())
+ val args = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
.findFirst().get.get
stream.Stream.of(
Arguments.of((args :+ true):_*),
diff --git
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index bff719681a2..4141342c7a9 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -235,7 +235,7 @@ class CustomQuotaCallbackTest extends
IntegrationTestHarness with SaslSetup {
createProducer(), createConsumer(), adminClient)
}
- case class GroupedUser(user: String, userGroup: String, topic: String,
leaderNode: KafkaServer,
+ case class GroupedUser(user: String, userGroup: String, topic: String,
leaderNode: KafkaBroker,
producerClientId: String, consumerClientId: String,
override val producer: KafkaProducer[Array[Byte],
Array[Byte]],
override val consumer: Consumer[Array[Byte],
Array[Byte]],
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index d0eabf370cb..f32c4433b45 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,11 +18,13 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class ProducerRebootstrapTest extends RebootstrapTest {
- @ParameterizedTest
+ @Disabled("KAFKA-17986")
+ @ParameterizedTest(name =
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
server1.shutdown()
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index 45324a89c6e..5d3134a0870 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.api
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@@ -26,8 +26,8 @@ import java.util.Properties
abstract class RebootstrapTest extends AbstractConsumerTest {
override def brokerCount: Int = 2
- def server0: KafkaServer = serverForId(0).get
- def server1: KafkaServer = serverForId(1).get
+ def server0: KafkaBroker = serverForId(0).get
+ def server1: KafkaBroker = serverForId(1).get
override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
@@ -36,7 +36,7 @@ abstract class RebootstrapTest extends AbstractConsumerTest {
// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.
- FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false)
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, null,
enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, overridingProps))
}
diff --git
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index ad7187ec5e1..162b14760fd 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -51,18 +51,14 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
/**
- * Get the list of brokers, which could be either BrokerServer objects or
KafkaServer objects.
+ * Get the list of brokers.
*/
def brokers: mutable.Buffer[KafkaBroker] = _brokers
/**
- * Get the list of brokers, as instances of KafkaServer.
- * This method should only be used when dealing with brokers that use
ZooKeeper.
+ * Get the list of brokers.
*/
- def servers: mutable.Buffer[KafkaServer] = {
- checkIsZKTest()
- _brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
- }
+ def servers: mutable.Buffer[KafkaBroker] = brokers
def brokerServers: mutable.Buffer[BrokerServer] = {
checkIsKRaftTest()
@@ -102,9 +98,9 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
instanceConfigs
}
- def serverForId(id: Int): Option[KafkaServer] = servers.find(s =>
s.config.brokerId == id)
+ def serverForId(id: Int): Option[KafkaBroker] = brokers.find(s =>
s.config.brokerId == id)
- def boundPort(server: KafkaServer): Int = server.boundPort(listenerName)
+ def boundPort(server: KafkaBroker): Int = server.boundPort(listenerName)
def bootstrapServers(listenerName: ListenerName = listenerName): String = {
TestUtils.bootstrapServers(_brokers, listenerName)
@@ -345,47 +341,26 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
}
}
- def getController(): KafkaServer = {
- checkIsZKTest()
- val controllerId = TestUtils.waitUntilControllerElected(zkClient)
- servers.filter(s => s.config.brokerId == controllerId).head
- }
-
def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
val result = new util.HashMap[String, Uuid]()
- if (isKRaftTest()) {
- val topicIdsMap =
controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
- names.foreach { name =>
- val response = topicIdsMap.get(name)
- result.put(name, response.result())
- }
- } else {
- val topicIdsMap =
getController().kafkaController.controllerContext.topicIds.toMap
- names.foreach { name =>
- if (topicIdsMap.contains(name)) result.put(name, topicIdsMap(name))
- }
+ val topicIdsMap =
controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
+ names.foreach { name =>
+ val response = topicIdsMap.get(name)
+ result.put(name, response.result())
}
result.asScala.toMap
}
def getTopicIds(): Map[String, Uuid] = {
- if (isKRaftTest()) {
-
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
- } else {
- getController().kafkaController.controllerContext.topicIds.toMap
- }
+
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
}
def getTopicNames(): Map[Uuid, String] = {
- if (isKRaftTest()) {
- val result = new util.HashMap[Uuid, String]()
-
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
- (key, value) => result.put(value, key)
- }
- result.asScala.toMap
- } else {
- getController().kafkaController.controllerContext.topicNames.toMap
+ val result = new util.HashMap[Uuid, String]()
+
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
+ (key, value) => result.put(value, key)
}
+ result.asScala.toMap
}
private def createBrokers(startup: Boolean): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 4c1494380ea..95a9f92fe86 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -56,25 +56,9 @@ abstract class BaseRequestTest extends
IntegrationTestHarness {
}.map(_.socketServer).getOrElse(throw new IllegalStateException("No live
broker is available"))
}
- def controllerSocketServer: SocketServer = {
- if (isKRaftTest()) {
- controllerServer.socketServer
- } else {
- servers.find { server =>
- server.kafkaController.isActive
- }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
controller broker is available"))
- }
- }
+ def controllerSocketServer: SocketServer = controllerServer.socketServer
- def notControllerSocketServer: SocketServer = {
- if (isKRaftTest()) {
- anySocketServer
- } else {
- servers.find { server =>
- !server.kafkaController.isActive
- }.map(_.socketServer).getOrElse(throw new IllegalStateException("No
non-controller broker is available"))
- }
- }
+ def notControllerSocketServer: SocketServer = anySocketServer
def brokerSocketServer(brokerId: Int): SocketServer = {
brokers.find { broker =>
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6c03d5f4670..aa2e634e9bf 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -20,7 +20,6 @@ import java.io.File
import java.util.Collections
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.api.IntegrationTestHarness
-import kafka.controller.{OfflineReplica, PartitionAndReplica}
import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll,
waitUntilTrue}
import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.Consumer
@@ -195,26 +194,17 @@ class LogDirFailureTest extends IntegrationTestHarness {
// Consumer should receive some messages
TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
- if (quorum == "kraft") {
- waitUntilTrue(() => {
- // get the broker with broker.nodeId == originalLeaderServerId
- val brokerWithDirFail = brokers.find(_.config.nodeId ==
originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
- // check if the broker has the offline log dir
- val hasOfflineDir =
brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
- // check if the broker has the offline replica
- hasOfflineDir && brokerWithDirFail.exists(broker =>
- broker.replicaManager.metadataCache
- .getClusterMetadata(broker.clusterId,
broker.config.interBrokerListenerName)
- .partition(new TopicPartition(topic,
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
- }, "Expected to find an offline log dir")
- } else {
- // There should be no remaining LogDirEventNotification znode
- assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
- // The controller should have marked the replica on the original leader
as offline
- val controllerServer = servers.find(_.kafkaController.isActive).get
- val offlineReplicas =
controllerServer.kafkaController.controllerContext.replicasInState(topic,
OfflineReplica)
- assertTrue(offlineReplicas.contains(PartitionAndReplica(new
TopicPartition(topic, 0), originalLeaderServerId)))
- }
+ waitUntilTrue(() => {
+ // get the broker with broker.nodeId == originalLeaderServerId
+ val brokerWithDirFail = brokers.find(_.config.nodeId ==
originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
+ // check if the broker has the offline log dir
+ val hasOfflineDir =
brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
+ // check if the broker has the offline replica
+ hasOfflineDir && brokerWithDirFail.exists(broker =>
+ broker.replicaManager.metadataCache
+ .getClusterMetadata(broker.clusterId,
broker.config.interBrokerListenerName)
+ .partition(new TopicPartition(topic,
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
+ }, "Expected to find an offline log dir")
}