This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff5b443df88 MINOR: Improve endpoint information epoch management in 
Streams group coordinator (#20854)
ff5b443df88 is described below

commit ff5b443df88271b3c86dc3e53a59b0936df2c09f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Nov 12 11:05:22 2025 +0100

    MINOR: Improve endpoint information epoch management in Streams group 
coordinator (#20854)
    
    Fix endpoint information epoch management in Streams group coordinator
    
    This commit addresses several issues in the endpoint information epoch
    handling for Streams groups:
    
    1. Conditional epoch bumping: Only increment endpointInformationEpoch
       when a member actually has a user endpoint defined, rather than on
       every member update. This prevents unnecessary epoch bumps for
       members without endpoints.
    
    2. Initial epoch value: Change endpointInformationEpoch initialization
       from -1 to 0 to be consistent with standard epoch semantics.
    
    3. Response epoch handling: Only include endpointInformationEpoch in
       responses when the group has been persisted, to better deal with the
       case where a group is newly created but the epoch information is
       not persisted (because it's soft state which is thrown away after
       the execution of the first heartbeat).
    
    4. Endpoint-to-partitions building: Refactor
    maybeBuildEndpointToPartitions
       to consistently include both existing and new members' endpoint
       information. The previous implementation did not incldue the endpoint
       for a new member. Also return an empty list instead of null if the
       set of endpoints actually becomes empty.
    
    The changes ensure that endpoint information is correctly propagated to
    all group members during rebalancing, including when new members join
    with user endpoints. Tests updated to reflect the new behavior and a
    comprehensive test added to verify endpoint information includes both
    existing and new members.
    
    Reviewers: Bill Bejeck<[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  32 ++--
 .../coordinator/group/streams/StreamsGroup.java    |   2 +-
 .../topics/EndpointToPartitionsManager.java        |  28 +++
 .../group/GroupMetadataManagerTest.java            | 190 +++++++++++++--------
 4 files changed, 166 insertions(+), 86 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9f551a5d02c..81b4feb2f75 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2092,7 +2092,9 @@ public class GroupMetadataManager {
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
-            if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
+            if (updatedMember.userEndpoint().isPresent()) {
+                // If no user endpoint is defined, there is no change in the 
endpoint information.
+                // Otherwise, bump the endpoint information epoch
                 
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
             }
         }
@@ -2100,7 +2102,11 @@ public class GroupMetadataManager {
         if (group.endpointInformationEpoch() != memberEndpointEpoch) {
             
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group, 
updatedMember));
         }
-        response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+        if (groups.containsKey(group.groupId())) {
+            // If we just created the group, the endpoint information epoch 
will not be persisted, so return epoch 0.
+            // Otherwise, return the bumped epoch.
+            
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+        }
 
         Map<String, CreatableTopic> internalTopicsToBeCreated = 
Collections.emptyMap();
         if 
(updatedConfiguredTopology.topicConfigurationException().isPresent()) {
@@ -2216,20 +2222,20 @@ public class GroupMetadataManager {
                                                                                
                         StreamsGroupMember updatedMember) {
         List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
         final Map<String, StreamsGroupMember> members = group.members();
+        // Build endpoint information for all members except the updated member
         for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
-            final String memberIdForAssignment = entry.getKey();
-            final Optional<StreamsGroupMemberMetadataValue.Endpoint> 
endpointOptional = members.get(memberIdForAssignment).userEndpoint();
-            StreamsGroupMember groupMember = updatedMember != null && 
memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : 
members.get(memberIdForAssignment);
-            if (endpointOptional.isPresent()) {
-                final StreamsGroupMemberMetadataValue.Endpoint endpoint = 
endpointOptional.get();
-                final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
-                responseEndpoint.setHost(endpoint.host());
-                responseEndpoint.setPort(endpoint.port());
-                StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = 
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, 
group, metadataImage);
-                endpointToPartitionsList.add(endpointToPartitions);
+            if (updatedMember != null && 
entry.getKey().equals(updatedMember.memberId())) {
+                continue;
             }
+            
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group, 
metadataImage)
+                .ifPresent(endpointToPartitionsList::add);
+        }
+        // Always build endpoint information for the updated member (whether 
new or existing)
+        if (updatedMember != null) {
+            
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group, 
metadataImage)
+                .ifPresent(endpointToPartitionsList::add);
         }
-        return endpointToPartitionsList.isEmpty() ? null : 
endpointToPartitionsList;
+        return endpointToPartitionsList;
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 192ddada4de..5b8fa258cb3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -208,7 +208,7 @@ public class StreamsGroup implements Group {
      * The current epoch for endpoint information, this is used to determine 
when to send
      * updated endpoint information to members of the group.
      */
-    private int endpointInformationEpoch = -1;
+    private int endpointInformationEpoch = 0;
 
     /**
      * The last used assignment configurations for this streams group.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
index 7b55f346f09..455a584da40 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams.topics;
 
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 
@@ -35,6 +36,33 @@ public class EndpointToPartitionsManager {
     private EndpointToPartitionsManager() {
     }
 
+    /**
+     * Creates endpoint-to-partitions mapping for a member if the member has a 
user endpoint.
+     * Returns empty if the member has no user endpoint.
+     *
+     * @param streamsGroupMember The streams group member.
+     * @param streamsGroup       The streams group.
+     * @param metadataImage      The metadata image.
+     * @return An Optional containing the EndpointToPartitions if the member 
has an endpoint, empty otherwise.
+     */
+    public static 
Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeEndpointToPartitions(
+        final StreamsGroupMember streamsGroupMember,
+        final StreamsGroup streamsGroup,
+        final CoordinatorMetadataImage metadataImage
+    ) {
+        Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = 
streamsGroupMember.userEndpoint();
+        if (endpointOptional.isEmpty()) {
+            return Optional.empty();
+        }
+
+        StreamsGroupMemberMetadataValue.Endpoint endpoint = 
endpointOptional.get();
+        StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new 
StreamsGroupHeartbeatResponseData.Endpoint();
+        responseEndpoint.setHost(endpoint.host());
+        responseEndpoint.setPort(endpoint.port());
+
+        return Optional.of(endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup, metadataImage));
+    }
+
     public static StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
                                                                                
               final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint,
                                                                                
               final StreamsGroup streamsGroup,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index cf80e19c4ff..6ac1587accd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -165,6 +165,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -16447,8 +16448,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
-                    .setStatusDetail("Source topics bar are missing.")))
-                .setEndpointInformationEpoch(-1),
+                    .setStatusDetail("Source topics bar are missing."))),
             result.response().data()
         );
 
@@ -16533,8 +16533,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
-                    .setStatusDetail("Internal topics are missing: [bar]")))
-                .setEndpointInformationEpoch(-1),
+                    .setStatusDetail("Internal topics are missing: [bar]"))),
             result.response().data()
         );
 
@@ -16616,8 +16615,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
-                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]")))
-                .setEndpointInformationEpoch(-1),
+                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]"))),
             result.response().data()
         );
 
@@ -16714,8 +16712,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.STALE_TOPOLOGY.code())
-                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1.")))
-                .setEndpointInformationEpoch(-1),
+                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1."))),
             result.response().data()
         );
 
@@ -16809,8 +16806,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setEndpointInformationEpoch(-1),
+                )),
             result1.response().data()
         );
         assertRecordsEquals(List.of(), result1.records());
@@ -16831,8 +16827,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setEndpointInformationEpoch(-1),
+                )),
             result2.response().data()
         );
 
@@ -16924,8 +16919,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setEndpointInformationEpoch(-1),
+                )),
             result2.response().data()
         );
     }
@@ -17263,8 +17257,7 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(-1),
+                .setWarmupTasks(List.of()),
 
             result.response().data()
         );
@@ -17404,8 +17397,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId)
                 .setMemberEpoch(1)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(-1),
+                .setHeartbeatIntervalMs(5000),
             result.response().data()
         );
     }
@@ -17513,8 +17505,7 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(-1),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -17555,8 +17546,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(0),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -17601,8 +17591,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(1),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -17635,8 +17624,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(1),
+                .setHeartbeatIntervalMs(5000),
             result.response().data()
         );
 
@@ -17675,8 +17663,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(1),
+                .setHeartbeatIntervalMs(5000),
             result.response().data()
         );
 
@@ -17705,8 +17692,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
                 .setMemberEpoch(10)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(1),
+                .setHeartbeatIntervalMs(5000),
             result.response().data()
         );
 
@@ -17732,8 +17718,7 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology2)
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(2),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -17763,8 +17748,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(2),
+                .setHeartbeatIntervalMs(5000),
             result.response().data()
         );
 
@@ -17806,8 +17790,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(3),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -17853,8 +17836,7 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology2)
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(4),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18192,8 +18174,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2, 3, 4, 5))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(1),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18454,8 +18435,7 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(-1),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18483,8 +18463,7 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(0),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18616,8 +18595,7 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(-1),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18644,8 +18622,7 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of())
-                .setEndpointInformationEpoch(0),
+                .setWarmupTasks(List.of()),
             result.response().data()
         );
 
@@ -18810,6 +18787,11 @@ public class GroupMetadataManagerTest {
                         .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
                         .setEndpointInformationEpoch(0));
 
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
expectedEndpointToPartitions = new 
StreamsGroupHeartbeatResponseData.EndpointToPartitions()
+            .setUserEndpoint(new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092))
+            .setActivePartitions(List.of(new 
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0,
 1))))
+            .setStandbyPartitions(List.of());
+
         assertResponseEquals(
                 new StreamsGroupHeartbeatResponseData()
                         .setMemberId(memberId)
@@ -18821,46 +18803,110 @@ public class GroupMetadataManagerTest {
                                         .setPartitions(List.of(0, 1))))
                         .setStandbyTasks(List.of())
                         .setWarmupTasks(List.of())
-                        .setPartitionsByUserEndpoint(null),
+                        
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)),
                 result.response().data()
         );
 
+        result = context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                        .setGroupId(groupId)
+                        .setMemberId(memberId)
+                        .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
+                        .setMemberEpoch(result.response().data().memberEpoch())
+                        
.setEndpointInformationEpoch(result.response().data().endpointInformationEpoch()));
+
         assertNull(result.response().data().partitionsByUserEndpoint());
+    }
 
-        
context.groupMetadataManager.streamsGroup(groupId).setEndpointInformationEpoch(1);
+    @Test
+    public void testStreamsGroupEndpointInformationIncludesNewMember() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+                new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
 
-        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
expectedEndpointToPartitions = new 
StreamsGroupHeartbeatResponseData.EndpointToPartitions()
-                .setUserEndpoint(new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092))
-                .setActivePartitions(List.of(new 
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0,
 1))))
-                .setStandbyPartitions(List.of());
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+                .withStreamsGroupTaskAssignors(List.of(assignor))
+                .withMetadataImage(new MetadataImageBuilder()
+                        .addTopic(fooTopicId, fooTopicName, 4)
+                        .buildCoordinatorMetadataImage())
+                .build();
 
-        result = context.streamsGroupHeartbeat(
+        // Prepare assignment for first member
+        assignor.prepareGroupAssignment(
+                Map.of(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+        // First member joins
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
                 new StreamsGroupHeartbeatRequestData()
                         .setGroupId(groupId)
-                        .setMemberId(memberId)
-                        .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
-                        
.setMemberEpoch(result.response().data().memberEpoch()));
+                        .setMemberId(memberId1)
+                        .setMemberEpoch(0)
+                        .setRebalanceTimeoutMs(1500)
+                        .setTopology(topology)
+                        .setActiveTasks(List.of())
+                        .setStandbyTasks(List.of())
+                        .setWarmupTasks(List.of())
+                        .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("host1").setPort(9092))
+                        .setEndpointInformationEpoch(0));
 
-        assertNotNull(result.response().data().partitionsByUserEndpoint());
-        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
actualEndpointToPartitions = 
result.response().data().partitionsByUserEndpoint().get(0);
-        assertEquals(expectedEndpointToPartitions.userEndpoint(), 
actualEndpointToPartitions.userEndpoint());
-        StreamsGroupHeartbeatResponseData.TopicPartition 
expectedEndpointTopicPartitions = 
expectedEndpointToPartitions.activePartitions().get(0);
-        StreamsGroupHeartbeatResponseData.TopicPartition 
actualEndpointTopicPartitions = 
actualEndpointToPartitions.activePartitions().get(0);
+        assertEquals(1, result.response().data().memberEpoch());
 
-        assertEquals(expectedEndpointTopicPartitions.topic(), 
actualEndpointTopicPartitions.topic());
-        List<Integer> actualPartitions = 
actualEndpointTopicPartitions.partitions();
-        Collections.sort(actualPartitions);
-        assertEquals(expectedEndpointTopicPartitions.partitions(), 
actualPartitions);
+        // Prepare assignment for both members
+        assignor.prepareGroupAssignment(
+                Map.of(
+                        memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)),
+                        memberId2, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3))
+                ));
 
+        // Second member joins
         result = context.streamsGroupHeartbeat(
                 new StreamsGroupHeartbeatRequestData()
                         .setGroupId(groupId)
-                        .setMemberId(memberId)
-                        .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
-                        .setMemberEpoch(result.response().data().memberEpoch())
-                        
.setEndpointInformationEpoch(result.response().data().endpointInformationEpoch()));
+                        .setMemberId(memberId2)
+                        .setMemberEpoch(0)
+                        .setRebalanceTimeoutMs(1500)
+                        .setTopology(topology)
+                        .setActiveTasks(List.of())
+                        .setStandbyTasks(List.of())
+                        .setWarmupTasks(List.of())
+                        .setUserEndpoint(new 
StreamsGroupHeartbeatRequestData.Endpoint().setHost("host2").setPort(9093)));
 
-        assertNull(result.response().data().partitionsByUserEndpoint());
+        // The response should include endpoint information because the 
member's epoch (0) differs from the group's (1)
+        assertNotNull(result.response().data().partitionsByUserEndpoint());
+        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointsList = result.response().data().partitionsByUserEndpoint();
+        assertEquals(2, endpointsList.size(), "Should include both members in 
endpoint information");
+
+        // Sort by port for consistent ordering
+        endpointsList.sort(Comparator.comparingInt(e -> 
e.userEndpoint().port()));
+
+        // Verify first member's endpoint
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions member1Endpoint 
= endpointsList.get(0);
+        assertEquals("host1", member1Endpoint.userEndpoint().host());
+        assertEquals(9092, member1Endpoint.userEndpoint().port());
+        assertEquals(1, member1Endpoint.activePartitions().size());
+        StreamsGroupHeartbeatResponseData.TopicPartition member1Topic = 
member1Endpoint.activePartitions().get(0);
+        assertEquals("foo", member1Topic.topic());
+        List<Integer> member1Partitions = new 
ArrayList<>(member1Topic.partitions());
+        Collections.sort(member1Partitions);
+        assertEquals(List.of(0, 1), member1Partitions);
+
+        // Verify second member's endpoint (the new member)
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions member2Endpoint 
= endpointsList.get(1);
+        assertEquals("host2", member2Endpoint.userEndpoint().host());
+        assertEquals(9093, member2Endpoint.userEndpoint().port());
+        assertEquals(1, member2Endpoint.activePartitions().size());
+        StreamsGroupHeartbeatResponseData.TopicPartition member2Topic = 
member2Endpoint.activePartitions().get(0);
+        assertEquals("foo", member2Topic.topic());
+        List<Integer> member2Partitions = new 
ArrayList<>(member2Topic.partitions());
+        Collections.sort(member2Partitions);
+        assertEquals(List.of(2, 3), member2Partitions);
     }
 
     @Test

Reply via email to