This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 65820acad27 MINOR: disable some rebootstrap tests, convert the others 
to KRaft (#17765)
65820acad27 is described below

commit 65820acad27908d32ca3797bf1fe4477bf74629c
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 12 09:59:20 2024 -0800

    MINOR: disable some rebootstrap tests, convert the others to KRaft (#17765)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/api/AdminClientRebootstrapTest.scala     |  2 +-
 .../kafka/api/ConsumerRebootstrapTest.scala        |  9 ++--
 .../kafka/api/CustomQuotaCallbackTest.scala        |  2 +-
 .../kafka/api/ProducerRebootstrapTest.scala        |  4 +-
 .../integration/kafka/api/RebootstrapTest.scala    |  8 ++--
 .../kafka/integration/KafkaServerTestHarness.scala | 53 ++++++----------------
 .../scala/unit/kafka/server/BaseRequestTest.scala  | 20 +-------
 .../unit/kafka/server/LogDirFailureTest.scala      | 32 +++++--------
 8 files changed, 42 insertions(+), 88 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
index d9cc326ff94..64cc259408e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -20,7 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 class AdminClientRebootstrapTest extends RebootstrapTest {
-  @ParameterizedTest
+  @ParameterizedTest(name = 
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
   @ValueSource(booleans = Array(false, true))
   def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
 
diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index c7c87ce3b84..5d6622799fe 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -17,11 +17,12 @@
 package kafka.api
 
 import kafka.api.ConsumerRebootstrapTest._
-import 
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+import 
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
@@ -31,6 +32,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeoutException
 
 class ConsumerRebootstrapTest extends RebootstrapTest {
+  @Disabled("KAFKA-17986")
   @ParameterizedTest(name = RebootstrapTestName)
   @MethodSource(Array("rebootstrapTestParams"))
   def testRebootstrap(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
@@ -84,6 +86,7 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
     consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, 
startingTimestamp = 20)
   }
 
+  @Disabled
   @ParameterizedTest(name = RebootstrapTestName)
   @MethodSource(Array("rebootstrapTestParams"))
   def testRebootstrapDisabled(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
@@ -133,8 +136,8 @@ object ConsumerRebootstrapTest {
 
   final val RebootstrapTestName = 
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
   def rebootstrapTestParams: stream.Stream[Arguments] = {
-    assertEquals(1, 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count())
-    val args = 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+    assertEquals(1, 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly.count())
+    val args = getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly
       .findFirst().get.get
     stream.Stream.of(
       Arguments.of((args :+ true):_*),
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index bff719681a2..4141342c7a9 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -235,7 +235,7 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
       createProducer(), createConsumer(), adminClient)
   }
 
-  case class GroupedUser(user: String, userGroup: String, topic: String, 
leaderNode: KafkaServer,
+  case class GroupedUser(user: String, userGroup: String, topic: String, 
leaderNode: KafkaBroker,
                          producerClientId: String, consumerClientId: String,
                          override val producer: KafkaProducer[Array[Byte], 
Array[Byte]],
                          override val consumer: Consumer[Array[Byte], 
Array[Byte]],
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index d0eabf370cb..f32c4433b45 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,11 +18,13 @@ package kafka.api
 
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 class ProducerRebootstrapTest extends RebootstrapTest {
-  @ParameterizedTest
+  @Disabled("KAFKA-17986")
+  @ParameterizedTest(name = 
"{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
   @ValueSource(booleans = Array(false, true))
   def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
     server1.shutdown()
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index 45324a89c6e..5d3134a0870 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.api
 
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.{KafkaBroker, KafkaConfig}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
@@ -26,8 +26,8 @@ import java.util.Properties
 abstract class RebootstrapTest extends AbstractConsumerTest {
   override def brokerCount: Int = 2
 
-  def server0: KafkaServer = serverForId(0).get
-  def server1: KafkaServer = serverForId(1).get
+  def server0: KafkaBroker = serverForId(0).get
+  def server1: KafkaBroker = serverForId(1).get
 
   override def generateConfigs: Seq[KafkaConfig] = {
     val overridingProps = new Properties()
@@ -36,7 +36,7 @@ abstract class RebootstrapTest extends AbstractConsumerTest {
 
     // In this test, fixed ports are necessary, because brokers must have the
     // same port after the restart.
-    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, 
enableControlledShutdown = false)
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, null, 
enableControlledShutdown = false)
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index ad7187ec5e1..162b14760fd 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -51,18 +51,14 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   private val _brokers = new mutable.ArrayBuffer[KafkaBroker]
 
   /**
-   * Get the list of brokers, which could be either BrokerServer objects or 
KafkaServer objects.
+   * Get the list of brokers.
    */
   def brokers: mutable.Buffer[KafkaBroker] = _brokers
 
   /**
-   * Get the list of brokers, as instances of KafkaServer.
-   * This method should only be used when dealing with brokers that use 
ZooKeeper.
+   * Get the list of brokers.
    */
-  def servers: mutable.Buffer[KafkaServer] = {
-    checkIsZKTest()
-    _brokers.asInstanceOf[mutable.Buffer[KafkaServer]]
-  }
+  def servers: mutable.Buffer[KafkaBroker] = brokers
 
   def brokerServers: mutable.Buffer[BrokerServer] = {
     checkIsKRaftTest()
@@ -102,9 +98,9 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     instanceConfigs
   }
 
-  def serverForId(id: Int): Option[KafkaServer] = servers.find(s => 
s.config.brokerId == id)
+  def serverForId(id: Int): Option[KafkaBroker] = brokers.find(s => 
s.config.brokerId == id)
 
-  def boundPort(server: KafkaServer): Int = server.boundPort(listenerName)
+  def boundPort(server: KafkaBroker): Int = server.boundPort(listenerName)
 
   def bootstrapServers(listenerName: ListenerName = listenerName): String = {
     TestUtils.bootstrapServers(_brokers, listenerName)
@@ -345,47 +341,26 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
     }
   }
 
-  def getController(): KafkaServer = {
-    checkIsZKTest()
-    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
-    servers.filter(s => s.config.brokerId == controllerId).head
-  }
-
   def getTopicIds(names: Seq[String]): Map[String, Uuid] = {
     val result = new util.HashMap[String, Uuid]()
-    if (isKRaftTest()) {
-      val topicIdsMap = 
controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
-      names.foreach { name =>
-        val response = topicIdsMap.get(name)
-        result.put(name, response.result())
-      }
-    } else {
-      val topicIdsMap = 
getController().kafkaController.controllerContext.topicIds.toMap
-      names.foreach { name =>
-        if (topicIdsMap.contains(name)) result.put(name, topicIdsMap(name))
-      }
+    val topicIdsMap = 
controllerServer.controller.findTopicIds(ANONYMOUS_CONTEXT, names.asJava).get()
+    names.foreach { name =>
+      val response = topicIdsMap.get(name)
+      result.put(name, response.result())
     }
     result.asScala.toMap
   }
 
   def getTopicIds(): Map[String, Uuid] = {
-    if (isKRaftTest()) {
-      
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
-    } else {
-      getController().kafkaController.controllerContext.topicIds.toMap
-    }
+    
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().asScala.toMap
   }
 
   def getTopicNames(): Map[Uuid, String] = {
-    if (isKRaftTest()) {
-      val result = new util.HashMap[Uuid, String]()
-      
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
-        (key, value) => result.put(value, key)
-      }
-      result.asScala.toMap
-    } else {
-      getController().kafkaController.controllerContext.topicNames.toMap
+    val result = new util.HashMap[Uuid, String]()
+    
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach {
+      (key, value) => result.put(value, key)
     }
+    result.asScala.toMap
   }
 
   private def createBrokers(startup: Boolean): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 4c1494380ea..95a9f92fe86 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -56,25 +56,9 @@ abstract class BaseRequestTest extends 
IntegrationTestHarness {
     }.map(_.socketServer).getOrElse(throw new IllegalStateException("No live 
broker is available"))
   }
 
-  def controllerSocketServer: SocketServer = {
-    if (isKRaftTest()) {
-     controllerServer.socketServer
-    } else {
-      servers.find { server =>
-        server.kafkaController.isActive
-      }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
controller broker is available"))
-    }
-  }
+  def controllerSocketServer: SocketServer = controllerServer.socketServer
 
-  def notControllerSocketServer: SocketServer = {
-    if (isKRaftTest()) {
-      anySocketServer
-    } else {
-      servers.find { server =>
-        !server.kafkaController.isActive
-      }.map(_.socketServer).getOrElse(throw new IllegalStateException("No 
non-controller broker is available"))
-    }
-  }
+  def notControllerSocketServer: SocketServer = anySocketServer
 
   def brokerSocketServer(brokerId: Int): SocketServer = {
     brokers.find { broker =>
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala 
b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 6c03d5f4670..aa2e634e9bf 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -20,7 +20,6 @@ import java.io.File
 import java.util.Collections
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import kafka.api.IntegrationTestHarness
-import kafka.controller.{OfflineReplica, PartitionAndReplica}
 import kafka.utils.TestUtils.{Checkpoint, LogDirFailureType, Roll, 
waitUntilTrue}
 import kafka.utils.{CoreUtils, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer.Consumer
@@ -195,26 +194,17 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // Consumer should receive some messages
     TestUtils.pollUntilAtLeastNumRecords(consumer, 1)
 
-    if (quorum == "kraft") {
-      waitUntilTrue(() => {
-        // get the broker with broker.nodeId == originalLeaderServerId
-        val brokerWithDirFail = brokers.find(_.config.nodeId == 
originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
-        // check if the broker has the offline log dir
-        val hasOfflineDir = 
brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
-        // check if the broker has the offline replica
-        hasOfflineDir && brokerWithDirFail.exists(broker =>
-          broker.replicaManager.metadataCache
-            .getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
-            .partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
-      }, "Expected to find an offline log dir")
-    } else {
-      // There should be no remaining LogDirEventNotification znode
-      assertTrue(zkClient.getAllLogDirEventNotifications.isEmpty)
-      // The controller should have marked the replica on the original leader 
as offline
-      val controllerServer = servers.find(_.kafkaController.isActive).get
-      val offlineReplicas = 
controllerServer.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-      assertTrue(offlineReplicas.contains(PartitionAndReplica(new 
TopicPartition(topic, 0), originalLeaderServerId)))
-    }
+    waitUntilTrue(() => {
+      // get the broker with broker.nodeId == originalLeaderServerId
+      val brokerWithDirFail = brokers.find(_.config.nodeId == 
originalLeaderServerId).map(_.asInstanceOf[BrokerServer])
+      // check if the broker has the offline log dir
+      val hasOfflineDir = 
brokerWithDirFail.exists(_.logDirFailureChannel.hasOfflineLogDir(failedLogDir.toPath.toString))
+      // check if the broker has the offline replica
+      hasOfflineDir && brokerWithDirFail.exists(broker =>
+        broker.replicaManager.metadataCache
+          .getClusterMetadata(broker.clusterId, 
broker.config.interBrokerListenerName)
+          .partition(new TopicPartition(topic, 
0)).offlineReplicas().map(_.id()).contains(originalLeaderServerId))
+    }, "Expected to find an offline log dir")
   }
 
 

Reply via email to