This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2585a9a7c45 KAFKA-20045: Fix flaky test
testDescribeStreamsGroupsNotReady (#21267)
2585a9a7c45 is described below
commit 2585a9a7c453aef5ad300e7a2ef1197adfdbded1
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 8 15:08:47 2026 +0100
KAFKA-20045: Fix flaky test testDescribeStreamsGroupsNotReady (#21267)
Streams group may not be in the results yet, in that case, wait longer.
I ran it 100 times locally to validate.
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 3cffef93274..efcf5139d74 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -47,7 +47,6 @@ import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
-import org.apache.kafka.common.test.api.Flaky
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupState,
GroupType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo,
TopicPartitionReplica, Uuid}
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
@@ -4430,7 +4429,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
- firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ firstGroup != null && firstGroup.groupState().orElse(null) ==
GroupState.STABLE
}, "Streams group did not transition to STABLE before timeout")
// Verify the describe call works correctly
@@ -4458,7 +4457,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Flaky("KAFKA-20045")
@Test
def testDescribeStreamsGroupsNotReady(): Unit = {
val streamsGroupId = "stream_group_id"
@@ -4478,7 +4476,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream()
.filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
- firstGroup.groupState().orElse(null) == GroupState.NOT_READY &&
firstGroup.groupId() == streamsGroupId
+ firstGroup != null && firstGroup.groupState().orElse(null) ==
GroupState.NOT_READY
}, "Streams group did not transition to NOT_READY before timeout")
// Verify the describe call works correctly
@@ -4521,8 +4519,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
try {
TestUtils.waitUntilTrue(() => {
- val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
- firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ val firstGroup = client.listGroups().all().get().stream()
+ .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+ firstGroup != null && firstGroup.groupState().orElse(null) ==
GroupState.STABLE
}, "Streams group did not transition to STABLE before timeout")
// Verify the describe call works correctly
@@ -4683,8 +4682,9 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
streams.commitSync()
TestUtils.waitUntilTrue(() => {
- val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
- firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ val firstGroup = client.listGroups().all().get().stream()
+ .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
+ firstGroup != null && firstGroup.groupState().orElse(null) ==
GroupState.STABLE
}, "Streams group did not transition to STABLE before timeout")
val allTopicPartitions = client.listStreamsGroupOffsets(