This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d6687b8f42 KAFKA-20074: Fix flaky 
PlaintextAdminIntegrationTest#testDescribeStreamsGroupsNotReady (#21317)
3d6687b8f42 is described below

commit 3d6687b8f424e2f07dce967c346e2dd9f1cad025
Author: ChickenchickenLove <[email protected]>
AuthorDate: Mon Jan 19 01:56:00 2026 +0900

    KAFKA-20074: Fix flaky 
PlaintextAdminIntegrationTest#testDescribeStreamsGroupsNotReady (#21317)
    
    ### Description
    This test was flaky because it assumes the Streams group will reach
    `GroupState.NOT_READY`, but depending on timing and environment the
    Streams changelog topic could be created successfully. When that
    happened, the group progressed to `ASSIGNING/RECONCILING/STABLE`, the
    test failed to observe `NOT_READY` within the timeout, and it sometimes
    produced Reconciliation failed logs during consumer shutdown due to an
    unfinished onTasksAssigned-related event.
    
    This change makes the behavior deterministic by allowing
    `createStreamsGroup()` to inject a `replication factor` into
    `StreamsRebalanceData.TopicInfo`. In
    `testDescribeStreamsGroupsNotReady`, we pass an intentionally impossible
    replication factor for the current cluster (e.g., 9999), ensuring the
    changelog topic creation attempt always fails. As a result, the internal
    topic remains missing and the Streams group reliably stays in
    `NOT_READY`, eliminating the timing-dependent state transition that
    caused the flakiness.
    
    ### Detail Flaky Case and Non Flaky Case
    <img width="909" height="925" alt="image"
    
    
src="https://github.com/user-attachments/assets/003445aa-5410-4dec-850b-efd714e4ff0b";
    />
    
    ### Result
    - Fixes
    
    
https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextAdminIntegrationTest&tests.sortField=FLAKY&tests.test=testDescribeStreamsGroupsNotReady()
    
    ### Related PR
    - https://github.com/apache/kafka/pull/21267
    
    Reviewers: Lucas Brutschy <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../test/scala/integration/kafka/api/IntegrationTestHarness.scala  | 7 +++++--
 .../integration/kafka/api/PlaintextAdminIntegrationTest.scala      | 4 +++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 2a5e8df40e4..9271b96d563 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -241,7 +241,8 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
                                configsToRemove: List[String] = List(),
                                inputTopics: Set[String],
                                changelogTopics: Set[String] = Set(),
-                               streamsGroupId: String): AsyncKafkaConsumer[K, 
V] = {
+                               streamsGroupId: String,
+                               replicationFactor: Optional[Short] = 
Optional.empty()): AsyncKafkaConsumer[K, V] = {
     val props = new Properties()
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
@@ -251,6 +252,8 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     props ++= configOverrides
     configsToRemove.foreach(props.remove(_))
 
+    val boxed: Optional[java.lang.Short] =
+      replicationFactor.map[java.lang.Short](s => java.lang.Short.valueOf(s))
     val streamsRebalanceData = new StreamsRebalanceData(
       UUID.randomUUID(),
       Optional.empty(),
@@ -259,7 +262,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
           inputTopics.asJava,
           util.Set.of(),
           util.Map.of(),
-          changelogTopics.map(c => (c, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), 
util.Map.of()))).toMap.asJava,
+          changelogTopics.map(c => (c, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), boxed, 
util.Map.of()))).toMap.asJava,
           util.Set.of()
         )),
       Map.empty[String, String].asJava
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index efcf5139d74..28202cb04d6 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -4465,10 +4465,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val config = createConfig
     client = Admin.create(config)
 
+    val unavailableReplicationFactorInThisCluster = 9999.toShort
     val streams = createStreamsGroup(
       inputTopics = Set(testTopicName),
       changelogTopics = Set(testTopicName + "-changelog"),
-      streamsGroupId = streamsGroupId
+      streamsGroupId = streamsGroupId,
+      replicationFactor = 
Optional.of(unavailableReplicationFactorInThisCluster)
     )
     streams.poll(JDuration.ofMillis(500L))
 

Reply via email to