This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81ada393d3e MINOR: convert ConsumerBounceTest to KRaft (#17997)
81ada393d3e is described below
commit 81ada393d3e6fef97241a9f8985371cb674c1eae
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Dec 5 23:25:48 2024 -0800
MINOR: convert ConsumerBounceTest to KRaft (#17997)
Reviewers: TengYao Chi <[email protected]>, TaiJuWu <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../integration/kafka/api/ConsumerBounceTest.scala | 57 +++++++++++++---------
1 file changed, 35 insertions(+), 22 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 55dbe268fa5..45436876fca 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -26,10 +26,10 @@ import
org.apache.kafka.common.message.FindCoordinatorRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FindCoordinatorRequest,
FindCoordinatorResponse}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
import org.apache.kafka.server.util.ShutdownableThread
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Disabled, Test}
+import org.junit.jupiter.api.{AfterEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -54,17 +54,30 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
generateKafkaConfigs()
}
+ val testConfigs = Map[String, String](
+ GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> "3", //
don't want to lose offset
+ GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> "1",
+ GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG -> "10", // set
small enough session timeout
+ GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG -> "0",
+ GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG -> maxGroupSize.toString,
+ ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> "false",
+ ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG -> "true",
+ ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG -> "50",
+ KRaftConfigs.BROKER_HEARTBEAT_INTERVAL_MS_CONFIG -> "50",
+ KRaftConfigs.BROKER_SESSION_TIMEOUT_MS_CONFIG -> "300",
+ )
+
+ override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
+ super.kraftControllerConfigs(testInfo).map(props => {
+ testConfigs.foreachEntry((k, v) => props.setProperty(k, v))
+ props
+ })
+ }
+
private def generateKafkaConfigs(maxGroupSize: String =
maxGroupSize.toString): Seq[KafkaConfig] = {
val properties = new Properties
-
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"3") // don't want to lose offset
- properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
- properties.put(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
"10") // set small enough session timeout
-
properties.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
- properties.put(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, maxGroupSize)
- properties.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"true")
- properties.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false")
-
- FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false)
+ testConfigs.foreachEntry((k, v) => properties.setProperty(k, v))
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, null,
enableControlledShutdown = false)
.map(KafkaConfig.fromProps(_, properties))
}
@@ -81,7 +94,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumptionWithBrokerFailures(quorum: String, groupProtocol:
String): Unit = consumeWithBrokerFailures(10)
/*
@@ -126,7 +139,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testSeekAndCommitWithBrokerFailures(quorum: String, groupProtocol:
String): Unit = seekAndCommitWithBrokerFailures(5)
def seekAndCommitWithBrokerFailures(numIters: Int): Unit = {
@@ -139,7 +152,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
consumer.seek(tp, 0)
// wait until all the followers have synced the last HW with leader
- TestUtils.waitUntilTrue(() => servers.forall(server =>
+ TestUtils.waitUntilTrue(() => brokerServers.forall(server =>
server.replicaManager.localLog(tp).get.highWatermark == numRecords
), "Failed to update high watermark for followers after timeout")
@@ -170,7 +183,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testSubscribeWhenTopicUnavailable(quorum: String, groupProtocol:
String): Unit = {
val numRecords = 1000
val newtopic = "newtopic"
@@ -210,7 +223,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
receiveExactRecords(poller, numRecords, 10000)
poller.shutdown()
- servers.foreach(server => killBroker(server.config.brokerId))
+ brokerServers.foreach(server => killBroker(server.config.brokerId))
Thread.sleep(500)
restartDeadBrokers()
@@ -222,7 +235,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testClose(quorum: String, groupProtocol: String): Unit = {
val numRecords = 10
val producer = createProducer()
@@ -293,7 +306,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
this.consumerConfig.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
requestTimeout.toString)
val consumer2 = createConsumerAndReceive(group2, manualAssign = true,
numRecords)
- servers.foreach(server => killBroker(server.config.brokerId))
+ brokerServers.foreach(server => killBroker(server.config.brokerId))
val closeTimeout = 2000
val future1 = submitCloseAndValidate(consumer1, closeTimeout, None,
Some(closeTimeout))
val future2 = submitCloseAndValidate(consumer2, Long.MaxValue, None,
Some(requestTimeout))
@@ -325,7 +338,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
// roll all brokers with a lesser max group size to make sure coordinator
has the new config
val newConfigs = generateKafkaConfigs(maxGroupSize.toString)
- for (serverIdx <- servers.indices) {
+ for (serverIdx <- brokerServers.indices) {
killBroker(serverIdx)
val config = newConfigs(serverIdx)
servers(serverIdx) = TestUtils.createServer(config, time =
brokerTime(config.brokerId))
@@ -348,7 +361,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
* When we have the consumer group max size configured to X, the X+1th
consumer trying to join should receive a fatal exception
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(quorum: String,
groupProtocol: String): Unit = {
val group = "fatal-exception-test"
val topic = "fatal-exception-test"
@@ -387,7 +400,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
* close should terminate immediately without sending leave group.
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testCloseDuringRebalance(quorum: String, groupProtocol: String): Unit = {
val topic = "closetest"
createTopic(topic, 10, brokerCount)
@@ -439,7 +452,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with
Logging {
// Trigger another rebalance and shutdown all brokers
// This consumer poll() doesn't complete and `tearDown` shuts down the
executor and closes the consumer
createConsumerToRebalance()
- servers.foreach(server => killBroker(server.config.brokerId))
+ brokerServers.foreach(server => killBroker(server.config.brokerId))
// consumer2 should close immediately without LeaveGroup request since
there are no brokers available
val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None,
Some(0))