This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 410065a65df KAFKA-18517: Enable ConsumerBounceTest to run for new
async consumer (#18532)
410065a65df is described below
commit 410065a65df2716e563429577dc2d3bcaa0c7c9a
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jan 22 18:02:38 2025 +0100
KAFKA-18517: Enable ConsumerBounceTest to run for new async consumer
(#18532)
Reviewers: Andrew Schofield <[email protected]>, Kirk True
<[email protected]>
---
.../integration/kafka/api/ConsumerBounceTest.scala | 45 +++++++++++++++-------
1 file changed, 31 insertions(+), 14 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a8dbe0ecdaa..3b257e3f7a5 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -29,7 +29,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -59,7 +59,12 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set
small enough session timeout
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
+
+ // Tests will run for CONSUMER and CLASSIC group protocol, so set the
group max size property
+ // required for each.
+ GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG ->
maxGroupSize.toString,
GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
+
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
@@ -94,7 +99,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol:
String): Unit = consumeWithBrokerFailures(10)
/*
@@ -139,7 +144,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol:
String): Unit = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@@ -183,7 +188,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol:
String): Unit = {
val numRecords = 1000
val newtopic = "newtopic"
@@ -243,7 +248,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
checkCloseGoodPath(numRecords, "group1")
checkCloseWithCoordinatorFailure(numRecords, "group2", "group3")
- checkCloseWithClusterFailure(numRecords, "group4", "group5")
+ checkCloseWithClusterFailure(numRecords, "group4", "group5", groupProtocol)
}
/**
@@ -297,12 +302,15 @@ class ConsumerBounceTest extends AbstractConsumerTest
with Logging {
* there is no coordinator, but close should timeout and return. If close is
invoked with a very
* large timeout, close should timeout after request timeout.
*/
- private def checkCloseWithClusterFailure(numRecords: Int, group1: String,
group2: String): Unit = {
+ private def checkCloseWithClusterFailure(numRecords: Int, group1: String,
group2: String,
+ groupProtocol: String): Unit = {
val consumer1 = createConsumerAndReceive(group1, manualAssign = false,
numRecords)
val requestTimeout = 6000
- this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"5000")
-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"5000")
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ }
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
val consumer2 = createConsumerAndReceive(group2, manualAssign = true,
numRecords)
@@ -319,9 +327,10 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
* the group should be forced to rebalance when it becomes hosted on a
Coordinator with the new config.
* Then, 1 consumer should be left out of the group.
*/
- @Test
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Disabled // TODO: To be re-enabled once we can make it less flaky
(KAFKA-13421)
- def
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): Unit
= {
+ def
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(quorum:
String, groupProtocol: String): Unit = {
val group = "group-max-size-test"
val topic = "group-max-size-test"
val maxGroupSize = 2
@@ -329,7 +338,9 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
val partitionCount = consumerCount * 2
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
"60000")
-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
val partitions = createTopicPartitions(topic, numPartitions =
partitionCount, replicationFactor = brokerCount)
@@ -361,12 +372,14 @@ class ConsumerBounceTest extends AbstractConsumerTest
with Logging {
* When we have the consumer group max size configured to X, the X+1th
consumer trying to join should receive a fatal exception
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String,
groupProtocol: String): Unit = {
val group = "fatal-exception-test"
val topic = "fatal-exception-test"
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
"60000")
-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
val partitions = createTopicPartitions(topic, numPartitions =
maxGroupSize, replicationFactor = brokerCount)
@@ -401,11 +414,15 @@ class ConsumerBounceTest extends AbstractConsumerTest
with Logging {
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ // TODO: enable for all protocols after fix for not generating/blocking on
unneeded
+ // FindCoordinator on close for the new consumer
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
val topic = "closetest"
createTopic(topic, 10, brokerCount)
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
"60000")
-
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ if (groupProtocol.equals(GroupProtocol.CLASSIC.name)) {
+
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
"1000")
+ }
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"false")
checkCloseDuringRebalance("group1", topic, executor,
brokersAvailableDuringClose = true)
}