This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 60ce83b2c65 KAFKA-18123 Fix flaky
DynamicBrokerReconfigurationTest#testThreadPoolResize (#17986)
60ce83b2c65 is described below
commit 60ce83b2c656461ead027139b30347b18c5c3459
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Dec 1 20:58:02 2024 +0800
KAFKA-18123 Fix flaky DynamicBrokerReconfigurationTest#testThreadPoolResize
(#17986)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/DynamicBrokerReconfigurationTest.scala | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c684e8fdc95..2d11963c846 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -760,6 +760,17 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testThreadPoolResize(quorum: String, groupProtocol: String): Unit = {
+
+ // In kraft mode, the StripedReplicaPlacer#initialize includes some
randomization,
+ // so the replica assignment is not deterministic.
+ // If a fetcher thread is not assigned any topic partition, it will not be
created.
+ // Change the replica assignment to ensure that all fetcher threads are
created.
+ TestUtils.deleteTopicWithAdmin(adminClients.head, topic, servers,
controllerServers)
+ val replicaAssignment = Map(
+ 0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 1, 0), 3 -> Seq(0, 1,
2), 4 -> Seq(1, 2, 0),
+ 5 -> Seq(2, 1, 0), 6 -> Seq(0, 1, 2), 7 -> Seq(1, 2, 0), 8 -> Seq(2, 1,
0), 9 -> Seq(0, 1, 2))
+ TestUtils.createTopicWithAdmin(adminClients.head, topic, servers,
controllerServers, replicaAssignment = replicaAssignment)
+
val requestHandlerPrefix = "data-plane-kafka-request-handler-"
val networkThreadPrefix = "data-plane-kafka-network-thread-"
val fetcherThreadPrefix = "ReplicaFetcherThread-"