This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 9ea30fb5eefce71ed79a4843845d3fe84b920d47 Author: Lucas Brutschy <[email protected]> AuthorDate: Fri Dec 5 14:40:57 2025 +0100 KAFKA-19945: Always set status field in StreamsGroupHeartbeat response (#21031) The status field should always set in the response, even when empty, to ensure that the status gets reset on the client. We are not doing this due to an oversight - it is defined as nullable, and it is null when not set. So status does not clear correctly on the client, which ignores the field if it's null. This can cause the streams application to incorrectly timeout, if the source topic does not exist when the application is first started, but it is created after the application started. Otherwise, there is no noticable difference. Reviewers: Lucas Brutschy <[email protected]> --- .../message/StreamsGroupHeartbeatResponse.json | 4 +- .../server/StreamsGroupHeartbeatRequestTest.scala | 2 +- .../coordinator/group/GroupMetadataManager.java | 7 +- .../group/GroupMetadataManagerTest.java | 165 +++++++++++++++++---- 4 files changed, 139 insertions(+), 39 deletions(-) diff --git a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json index efeaf452571..220c9704a23 100644 --- a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json @@ -53,8 +53,8 @@ { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+", "about": "The interval in which the task changelog offsets on a client are updated on the broker. The offsets are sent with the next heartbeat after this time has passed." }, - { "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "Indicate zero or more status for the group. Null if unchanged since last heartbeat." }, + { "name": "Status", "type": "[]Status", "versions": "0+", "nullableVersions": "0+", + "about": "Indicate zero or more status for the group." }, // The streams app knows which partitions to fetch from given this information { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", "nullableVersions": "0+", "default": "null", diff --git a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala index 2d7f0649333..252ee82fb14 100644 --- a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala @@ -209,7 +209,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo assertNotNull(streamsGroupHeartbeatResponse, "StreamsGroupHeartbeatResponse should not be null") assertEquals(memberId, streamsGroupHeartbeatResponse.memberId()) assertEquals(3, streamsGroupHeartbeatResponse.memberEpoch()) - assertEquals(null, streamsGroupHeartbeatResponse.status()) + assertEquals(List.empty.asJava, streamsGroupHeartbeatResponse.status()) val expectedActiveTasks = List( new StreamsGroupHeartbeatResponseData.TaskIds() .setSubtopologyId("subtopology-1") diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6bf963c968e..6e6eadecd1e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2155,9 +2155,7 @@ public class GroupMetadataManager { ) )); - if (!returnedStatus.isEmpty()) { - response.setStatus(returnedStatus); - } + response.setStatus(returnedStatus); return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated)); } @@ -4217,7 +4215,8 @@ public class GroupMetadataManager { } StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(memberEpoch); + .setMemberEpoch(memberEpoch) + .setStatus(List.of()); if (instanceId == null) { StreamsGroupMember member = group.getMemberOrThrow(memberId); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index b8beedca616..3f24ba06f4e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -16326,7 +16326,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -16411,7 +16412,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2, 3, 4, 5)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -16946,7 +16948,8 @@ public class GroupMetadataManagerTest { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH), + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setStatus(List.of()), result1.response().data() ); assertRecordsEquals( @@ -17056,7 +17059,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17176,7 +17180,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17315,7 +17320,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17379,7 +17385,8 @@ public class GroupMetadataManagerTest { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH), + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setStatus(List.of()), result.response().data() ); @@ -17441,7 +17448,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17456,7 +17464,77 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(2) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), + result.response().data() + ); + } + + @Test + public void testStreamsGroupHeartbeatAlwaysSetsStatus() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) + .build(); + + // Prepare new assignment for the group. + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + // Heartbeat with no errors should still have status field set to empty list. + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + // Verify that status is always set, even when empty. + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setStatus(List.of()), + result.response().data() + ); + + // Verify status field is present in subsequent heartbeats as well. + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(result.response().data().memberEpoch())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), result.response().data() ); } @@ -17503,7 +17581,8 @@ public class GroupMetadataManagerTest { .setStandbyTasks(List.of()) .setWarmupTasks(List.of()) .setPartitionsByUserEndpoint(null) - .setEndpointInformationEpoch(0), + .setEndpointInformationEpoch(0) + .setStatus(List.of()), result.response().data() ); @@ -17531,7 +17610,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); } @@ -17639,7 +17719,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17680,7 +17761,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17725,7 +17807,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17758,7 +17841,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), result.response().data() ); @@ -17797,7 +17881,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), result.response().data() ); @@ -17826,7 +17911,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) .setMemberEpoch(10) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), result.response().data() ); @@ -17852,7 +17938,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17882,7 +17969,8 @@ public class GroupMetadataManagerTest { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId3) .setMemberEpoch(11) - .setHeartbeatIntervalMs(5000), + .setHeartbeatIntervalMs(5000) + .setStatus(List.of()), result.response().data() ); @@ -17924,7 +18012,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(2)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -17970,7 +18059,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology2) .setPartitions(List.of(1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18180,7 +18270,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2, 3, 4, 5)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18309,7 +18400,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1, 2, 3, 4, 5)) )) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18535,7 +18627,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1, 2)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18573,7 +18666,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18601,7 +18695,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18632,7 +18727,8 @@ public class GroupMetadataManagerTest { .setMemberId(memberId1) .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(0), + .setEndpointInformationEpoch(0) + .setStatus(List.of()), result.response().data() ); @@ -18696,7 +18792,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1, 2)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18734,7 +18831,8 @@ public class GroupMetadataManagerTest { .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18761,7 +18859,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() ); @@ -18943,7 +19042,8 @@ public class GroupMetadataManagerTest { .setPartitions(List.of(0, 1)))) .setStandbyTasks(List.of()) .setWarmupTasks(List.of()) - .setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)), + .setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)) + .setStatus(List.of()), result.response().data() ); @@ -19119,7 +19219,8 @@ public class GroupMetadataManagerTest { .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1, 2)))) .setStandbyTasks(List.of()) - .setWarmupTasks(List.of()), + .setWarmupTasks(List.of()) + .setStatus(List.of()), result.response().data() );
