This is an automated email from the ASF dual-hosted git repository.

squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6de42dcdd8a MINOR: Fix race in StreamsGroupHeartbeatRequestTest 
testDynamicGroupConfig (#22277)
6de42dcdd8a is described below

commit 6de42dcdd8a5ea509e2e90a2d1c8c0a35718fc83
Author: Sean Quah <[email protected]>
AuthorDate: Thu May 14 05:19:09 2026 +0100

    MINOR: Fix race in StreamsGroupHeartbeatRequestTest testDynamicGroupConfig 
(#22277)
    
    In testDynamicGroupConfig, we dynamically configure a group's
    streams.num.standby.replicas, followed by
    streams.task.offset.interval.ms. We then wait for the next assignment
    and check that it has the expected number of standby replicas and
    expected task offset interval. However, this only guarantees that the
    new streams.num.standby.replicas config has propagated to the group
    coordinator and not necessarily the new task offset interval.
    
    Add a loop to heartbeat until we observe the expected task offset
    interval or a timeout elapses.
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../server/StreamsGroupHeartbeatRequestTest.scala   | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 85b04e3e3ed..7b4ec29c305 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -826,8 +826,25 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task 
should have one standby task")
 
       // Verify both members picked up change of `task.offset.interval.ms`
-      assertEquals(45_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
-      assertEquals(45_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId1,
+          memberEpoch = streamsGroupHeartbeatResponse1.memberEpoch()
+        )
+        streamsGroupHeartbeatResponse1.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.taskOffsetIntervalMs() == 45_000
+      }, "Member 1 did not pick up updated task.offset.interval.ms within the 
timeout period.")
+
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse2 = streamsGroupHeartbeat(
+          groupId = groupId,
+          memberId = memberId2,
+          memberEpoch = streamsGroupHeartbeatResponse2.memberEpoch()
+        )
+        streamsGroupHeartbeatResponse2.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.taskOffsetIntervalMs() == 45_000
+      }, "Member 2 did not pick up updated task.offset.interval.ms within the 
timeout period.")
 
     } finally {
       admin.close()

Reply via email to