This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 60ce83b2c65 KAFKA-18123 Fix flaky 
DynamicBrokerReconfigurationTest#testThreadPoolResize (#17986)
60ce83b2c65 is described below

commit 60ce83b2c656461ead027139b30347b18c5c3459
Author: PoAn Yang <[email protected]>
AuthorDate: Sun Dec 1 20:58:02 2024 +0800

    KAFKA-18123 Fix flaky DynamicBrokerReconfigurationTest#testThreadPoolResize 
(#17986)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala       | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c684e8fdc95..2d11963c846 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -760,6 +760,17 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testThreadPoolResize(quorum: String, groupProtocol: String): Unit = {
+
+    // In kraft mode, the StripedReplicaPlacer#initialize includes some 
randomization,
+    // so the replica assignment is not deterministic.
+    // If a fetcher thread is not assigned any topic partition, it will not be 
created.
+    // Change the replica assignment to ensure that all fetcher threads are 
created.
+    TestUtils.deleteTopicWithAdmin(adminClients.head, topic, servers, 
controllerServers)
+    val replicaAssignment = Map(
+      0 -> Seq(0, 1, 2), 1 -> Seq(1, 2, 0), 2 -> Seq(2, 1, 0), 3 -> Seq(0, 1, 
2), 4 -> Seq(1, 2, 0),
+      5 -> Seq(2, 1, 0), 6 -> Seq(0, 1, 2), 7 -> Seq(1, 2, 0), 8 -> Seq(2, 1, 
0), 9 -> Seq(0, 1, 2))
+    TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, 
controllerServers, replicaAssignment = replicaAssignment)
+
     val requestHandlerPrefix = "data-plane-kafka-request-handler-"
     val networkThreadPrefix = "data-plane-kafka-network-thread-"
     val fetcherThreadPrefix = "ReplicaFetcherThread-"

Reply via email to