This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d6687b8f42 KAFKA-20074: Fix flaky
PlaintextAdminIntegrationTest#testDescribeStreamsGroupsNotReady (#21317)
3d6687b8f42 is described below
commit 3d6687b8f424e2f07dce967c346e2dd9f1cad025
Author: ChickenchickenLove <[email protected]>
AuthorDate: Mon Jan 19 01:56:00 2026 +0900
KAFKA-20074: Fix flaky
PlaintextAdminIntegrationTest#testDescribeStreamsGroupsNotReady (#21317)
### Description
This test was flaky because it assumes the Streams group will reach
`GroupState.NOT_READY`, but depending on timing and environment the
Streams changelog topic could be created successfully. When that
happened, the group progressed to `ASSIGNING/RECONCILING/STABLE`, the
test failed to observe `NOT_READY` within the timeout, and it sometimes
produced Reconciliation failed logs during consumer shutdown due to an
unfinished onTasksAssigned-related event.
This change makes the behavior deterministic by allowing
`createStreamsGroup()` to inject a `replication factor` into
`StreamsRebalanceData.TopicInfo`. In
`testDescribeStreamsGroupsNotReady`, we pass an intentionally impossible
replication factor for the current cluster (e.g., 9999), ensuring the
changelog topic creation attempt always fails. As a result, the internal
topic remains missing and the Streams group reliably stays in
`NOT_READY`, eliminating the timing-dependent state transition that
caused the flakiness.
### Detail Flaky Case and Non Flaky Case
<img width="909" height="925" alt="image"
src="https://github.com/user-attachments/assets/003445aa-5410-4dec-850b-efd714e4ff0b"
/>
### Result
- Fixes
https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextAdminIntegrationTest&tests.sortField=FLAKY&tests.test=testDescribeStreamsGroupsNotReady()
### Related PR
- https://github.com/apache/kafka/pull/21267
Reviewers: Lucas Brutschy <[email protected]>, Lianet Magrans
<[email protected]>
---
.../test/scala/integration/kafka/api/IntegrationTestHarness.scala | 7 +++++--
.../integration/kafka/api/PlaintextAdminIntegrationTest.scala | 4 +++-
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 2a5e8df40e4..9271b96d563 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -241,7 +241,8 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
configsToRemove: List[String] = List(),
inputTopics: Set[String],
changelogTopics: Set[String] = Set(),
- streamsGroupId: String): AsyncKafkaConsumer[K,
V] = {
+ streamsGroupId: String,
+ replicationFactor: Optional[Short] =
Optional.empty()): AsyncKafkaConsumer[K, V] = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
props.put(ConsumerConfig.GROUP_ID_CONFIG, streamsGroupId)
@@ -251,6 +252,8 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
props ++= configOverrides
configsToRemove.foreach(props.remove(_))
+ val boxed: Optional[java.lang.Short] =
+ replicationFactor.map[java.lang.Short](s => java.lang.Short.valueOf(s))
val streamsRebalanceData = new StreamsRebalanceData(
UUID.randomUUID(),
Optional.empty(),
@@ -259,7 +262,7 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
inputTopics.asJava,
util.Set.of(),
util.Map.of(),
- changelogTopics.map(c => (c, new
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(),
util.Map.of()))).toMap.asJava,
+ changelogTopics.map(c => (c, new
StreamsRebalanceData.TopicInfo(Optional.empty(), boxed,
util.Map.of()))).toMap.asJava,
util.Set.of()
)),
Map.empty[String, String].asJava
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index efcf5139d74..28202cb04d6 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -4465,10 +4465,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val config = createConfig
client = Admin.create(config)
+ val unavailableReplicationFactorInThisCluster = 9999.toShort
val streams = createStreamsGroup(
inputTopics = Set(testTopicName),
changelogTopics = Set(testTopicName + "-changelog"),
- streamsGroupId = streamsGroupId
+ streamsGroupId = streamsGroupId,
+ replicationFactor =
Optional.of(unavailableReplicationFactorInThisCluster)
)
streams.poll(JDuration.ofMillis(500L))