This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2585a9a7c45 KAFKA-20045: Fix flaky test 
testDescribeStreamsGroupsNotReady (#21267)
2585a9a7c45 is described below

commit 2585a9a7c453aef5ad300e7a2ef1197adfdbded1
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 8 15:08:47 2026 +0100

    KAFKA-20045: Fix flaky test testDescribeStreamsGroupsNotReady (#21267)
    
    Streams group may not be in the results yet, in that case, wait longer.
    
    I ran it 100 times locally to validate.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala        | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 3cffef93274..efcf5139d74 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -47,7 +47,6 @@ import org.apache.kafka.common.record.FileRecords
 import org.apache.kafka.common.requests.DeleteRecordsRequest
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourceType}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
-import org.apache.kafka.common.test.api.Flaky
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupState, 
GroupType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, 
TopicPartitionReplica, Uuid}
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
@@ -4430,7 +4429,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       TestUtils.waitUntilTrue(() => {
         val firstGroup = client.listGroups().all().get().stream()
           .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
-        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+        firstGroup != null && firstGroup.groupState().orElse(null) == 
GroupState.STABLE
       }, "Streams group did not transition to STABLE before timeout")
 
       // Verify the describe call works correctly
@@ -4458,7 +4457,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Flaky("KAFKA-20045")
   @Test
   def testDescribeStreamsGroupsNotReady(): Unit = {
     val streamsGroupId = "stream_group_id"
@@ -4478,7 +4476,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       TestUtils.waitUntilTrue(() => {
         val firstGroup = client.listGroups().all().get().stream()
           .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
-        firstGroup.groupState().orElse(null) == GroupState.NOT_READY && 
firstGroup.groupId() == streamsGroupId
+        firstGroup != null && firstGroup.groupState().orElse(null) == 
GroupState.NOT_READY
       }, "Streams group did not transition to NOT_READY before timeout")
 
       // Verify the describe call works correctly
@@ -4521,8 +4519,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
     try {
       TestUtils.waitUntilTrue(() => {
-        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
-        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+        val firstGroup = client.listGroups().all().get().stream()
+          .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+        firstGroup != null && firstGroup.groupState().orElse(null) == 
GroupState.STABLE
       }, "Streams group did not transition to STABLE before timeout")
 
       // Verify the describe call works correctly
@@ -4683,8 +4682,9 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       streams.commitSync()
 
       TestUtils.waitUntilTrue(() => {
-        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
-        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+        val firstGroup = client.listGroups().all().get().stream()
+          .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+        firstGroup != null && firstGroup.groupState().orElse(null) == 
GroupState.STABLE
       }, "Streams group did not transition to STABLE before timeout")
 
       val allTopicPartitions = client.listStreamsGroupOffsets(

Reply via email to