This is an automated email from the ASF dual-hosted git repository.
squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6de42dcdd8a MINOR: Fix race in StreamsGroupHeartbeatRequestTest
testDynamicGroupConfig (#22277)
6de42dcdd8a is described below
commit 6de42dcdd8a5ea509e2e90a2d1c8c0a35718fc83
Author: Sean Quah <[email protected]>
AuthorDate: Thu May 14 05:19:09 2026 +0100
MINOR: Fix race in StreamsGroupHeartbeatRequestTest testDynamicGroupConfig
(#22277)
In testDynamicGroupConfig, we dynamically configure a group's
streams.num.standby.replicas, followed by
streams.task.offset.interval.ms. We then wait for the next assignment
and check that it has the expected number of standby replicas and
expected task offset interval. However, this only guarantees that the
new streams.num.standby.replicas config has propagated to the group
coordinator and not necessarily the new task offset interval.
Add a loop to heartbeat until we observe the expected task offset
interval or a timeout elapses.
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../server/StreamsGroupHeartbeatRequestTest.scala | 21 +++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 85b04e3e3ed..7b4ec29c305 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -826,8 +826,25 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task
should have one standby task")
// Verify both members picked up change of `task.offset.interval.ms`
- assertEquals(45_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
- assertEquals(45_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId1,
+ memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch()
+ )
+ streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse1.taskOffsetIntervalMs() == 45_000
+ }, "Member 1 did not pick up updated task.offset.interval.ms within the
timeout period.")
+
+ TestUtils.waitUntilTrue(() => {
+ streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch()
+ )
+ streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+ streamsGroupHeartbeatResponse2.taskOffsetIntervalMs() == 45_000
+ }, "Member 2 did not pick up updated task.offset.interval.ms within the
timeout period.")
} finally {
admin.close()