This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81ada393d3e MINOR: convert ConsumerBounceTest to KRaft (#17997)
81ada393d3e is described below

commit 81ada393d3e6fef97241a9f8985371cb674c1eae
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 5 23:25:48 2024 -0800

    MINOR: convert ConsumerBounceTest to KRaft (#17997)
    
    Reviewers: TengYao Chi <[email protected]>, TaiJuWu <[email protected]>, 
Chia-Ping Tsai <[email protected]>
---
 .../integration/kafka/api/ConsumerBounceTest.scala | 57 +++++++++++++---------
 1 file changed, 35 insertions(+), 22 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 55dbe268fa5..45436876fca 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -26,10 +26,10 @@ import 
org.apache.kafka.common.message.FindCoordinatorRequestData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{FindCoordinatorRequest, 
FindCoordinatorResponse}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.util.ShutdownableThread
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -54,17 +54,30 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     generateKafkaConfigs()
   }
 
+  val testConfigs = Map[String, String](
+    GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> "3", // 
don't want to lose offset
+    GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
+    GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set 
small enough session timeout
+    GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
+    GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
+    ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
+    ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
+    ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
+    KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG -> "50",
+    KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG -> "300",
+  )
+
+  override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
+    super.kraftControllerConfigs(testInfo).map(props => {
+      testConfigs.foreachEntry((k, v) => props.setProperty(k, v))
+      props
+    })
+  }
+
   private def generateKafkaConfigs(maxGroupSize: String = 
maxGroupSize.toString): Seq[KafkaConfig] = {
     val properties = new Properties
-    
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"3") // don't want to lose offset
-    properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
-    properties.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
"10") // set small enough session timeout
-    
properties.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"0")
-    properties.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, maxGroupSize)
-    properties.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
-    properties.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
-
-    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, 
enableControlledShutdown = false)
+    testConfigs.foreachEntry((k, v) => properties.setProperty(k, v))
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, null, 
enableControlledShutdown = false)
       .map(KafkaConfig.fromProps(_, properties))
   }
 
@@ -81,7 +94,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: 
String): Unit = consumeWithBrokerFailures(10)
 
   /*
@@ -126,7 +139,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: 
String): Unit = seekAndCommitWithBrokerFailures(5)
 
   def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@@ -139,7 +152,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     consumer.seek(tp, 0)
 
     // wait until all the followers have synced the last HW with leader
-    TestUtils.waitUntilTrue(() => servers.forall(server =>
+    TestUtils.waitUntilTrue(() => brokerServers.forall(server =>
       server.replicaManager.localLog(tp).get.highWatermark == numRecords
     ), "Failed to update high watermark for followers after timeout")
 
@@ -170,7 +183,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: 
String): Unit = {
     val numRecords = 1000
     val newtopic = "newtopic"
@@ -210,7 +223,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     receiveExactRecords(poller, numRecords, 10000)
     poller.shutdown()
 
-    servers.foreach(server => killBroker(server.config.brokerId))
+    brokerServers.foreach(server => killBroker(server.config.brokerId))
     Thread.sleep(500)
     restartDeadBrokers()
 
@@ -222,7 +235,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testClose(quorum: String, groupProtocol: String): Unit = {
     val numRecords = 10
     val producer = createProducer()
@@ -293,7 +306,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
     val consumer2 = createConsumerAndReceive(group2, manualAssign = true, 
numRecords)
 
-    servers.foreach(server => killBroker(server.config.brokerId))
+    brokerServers.foreach(server => killBroker(server.config.brokerId))
     val closeTimeout = 2000
     val future1 = submitCloseAndValidate(consumer1, closeTimeout, None, 
Some(closeTimeout))
     val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, 
Some(requestTimeout))
@@ -325,7 +338,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
 
     // roll all brokers with a lesser max group size to make sure coordinator 
has the new config
     val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
-    for (serverIdx <- servers.indices) {
+    for (serverIdx <- brokerServers.indices) {
       killBroker(serverIdx)
       val config = newConfigs(serverIdx)
       servers(serverIdx) = TestUtils.createServer(config, time = 
brokerTime(config.brokerId))
@@ -348,7 +361,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     * When we have the consumer group max size configured to X, the X+1th 
consumer trying to join should receive a fatal exception
     */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, 
groupProtocol: String): Unit = {
     val group = "fatal-exception-test"
     val topic = "fatal-exception-test"
@@ -387,7 +400,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
    * close should terminate immediately without sending leave group.
    */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
     val topic = "closetest"
     createTopic(topic, 10, brokerCount)
@@ -439,7 +452,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     // Trigger another rebalance and shutdown all brokers
     // This consumer poll() doesn't complete and `tearDown` shuts down the 
executor and closes the consumer
     createConsumerToRebalance()
-    servers.foreach(server => killBroker(server.config.brokerId))
+    brokerServers.foreach(server => killBroker(server.config.brokerId))
 
     // consumer2 should close immediately without LeaveGroup request since 
there are no brokers available
     val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, 
Some(0))

Reply via email to