This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 410065a65df KAFKA-18517: Enable ConsumerBounceTest to run for new 
async consumer (#18532)
410065a65df is described below

commit 410065a65df2716e563429577dc2d3bcaa0c7c9a
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jan 22 18:02:38 2025 +0100

    KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer 
(#18532)
    
    Reviewers: Andrew Schofield <[email protected]>, Kirk True 
<[email protected]>
---
 .../integration/kafka/api/ConsumerBounceTest.scala | 45 +++++++++++++++-------
 1 file changed, 31 insertions(+), 14 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a8dbe0ecdaa..3b257e3f7a5 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -29,7 +29,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
 import org.apache.kafka.server.util.ShutdownableThread
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
@@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
     GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set 
small enough session timeout
     GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
+
+    // Tests will run for CONSUMER and CLASSIC group protocol, so set the 
group max size property
+    // required for each.
+    GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG -> 
maxGroupSize.toString,
     GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
+
     ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
     ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
     ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
@@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumptionWithBrokerFailures(quorum: String, groupProtocol: 
String): Unit = consumeWithBrokerFailures(10)
 
   /*
@@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol: 
String): Unit = seekAndCommitWithBrokerFailures(5)
 
   def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol: 
String): Unit = {
     val numRecords = 1000
     val newtopic = "newtopic"
@@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
 
     checkCloseGoodPath(numRecords, "group1")
     checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
-    checkCloseWithClusterFailure(numRecords, "group4", "group5")
+    checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)
   }
 
   /**
@@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
    * there is no coordinator, but close should timeout and return. If close is 
invoked with a very
    * large timeout, close should timeout after request timeout.
    */
-  private def checkCloseWithClusterFailure(numRecords: Int, group1: String, 
group2: String): Unit = {
+  private def checkCloseWithClusterFailure(numRecords: Int, group1: String, 
group2: String,
+                                           groupProtocol: String): Unit = {
     val consumer1 = createConsumerAndReceive(group1, manualAssign = false, 
numRecords)
 
     val requestTimeout = 6000
-    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"5000")
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+      
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
"5000")
+      
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    }
     this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
requestTimeout.toString)
     val consumer2 = createConsumerAndReceive(group2, manualAssign = true, 
numRecords)
 
@@ -319,9 +327,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     * the group should be forced to rebalance when it becomes hosted on a 
Coordinator with the new config.
     * Then, 1 consumer should be left out of the group.
     */
-  @Test
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   @Disabled // TODO: To be re-enabled once we can make it less flaky 
(KAFKA-13421)
-  def 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit 
= {
+  def 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum: 
String, groupProtocol: String): Unit = {
     val group = "group-max-size-test"
     val topic = "group-max-size-test"
     val maxGroupSize = 2
@@ -329,7 +338,9 @@ class ConsumerBounceTest extends AbstractConsumerTest with 
Logging {
     val partitionCount = consumerCount * 2
 
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+      
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
     val partitions = createTopicPartitions(topic, numPartitions = 
partitionCount, replicationFactor = brokerCount)
 
@@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
     * When we have the consumer group max size configured to X, the X+1th 
consumer trying to join should receive a fatal exception
     */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String, 
groupProtocol: String): Unit = {
     val group = "fatal-exception-test"
     val topic = "fatal-exception-test"
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+      
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
 
     val partitions = createTopicPartitions(topic, numPartitions = 
maxGroupSize, replicationFactor = brokerCount)
@@ -401,11 +414,15 @@ class ConsumerBounceTest extends AbstractConsumerTest 
with Logging {
    */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  // TODO: enable for all protocols after fix for not generating/blocking on 
unneeded
+  //  FindCoordinator on close for the new consumer
   def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
     val topic = "closetest"
     createTopic(topic, 10, brokerCount)
     
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
"60000")
-    
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+      
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
"1000")
+    }
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false")
     checkCloseDuringRebalance("group1", topic, executor, 
brokersAvailableDuringClose = true)
   }

Reply via email to