This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff5b443df88 MINOR: Improve endpoint information epoch management in
Streams group coordinator (#20854)
ff5b443df88 is described below
commit ff5b443df88271b3c86dc3e53a59b0936df2c09f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Nov 12 11:05:22 2025 +0100
MINOR: Improve endpoint information epoch management in Streams group
coordinator (#20854)
Fix endpoint information epoch management in Streams group coordinator
This commit addresses several issues in the endpoint information epoch
handling for Streams groups:
1. Conditional epoch bumping: Only increment endpointInformationEpoch
when a member actually has a user endpoint defined, rather than on
every member update. This prevents unnecessary epoch bumps for
members without endpoints.
2. Initial epoch value: Change endpointInformationEpoch initialization
from -1 to 0 to be consistent with standard epoch semantics.
3. Response epoch handling: Only include endpointInformationEpoch in
responses when the group has been persisted, to better deal with the
case where a group is newly created but the epoch information is
not persisted (because it's soft state which is thrown away after
the execution of the first heartbeat).
4. Endpoint-to-partitions building: Refactor
maybeBuildEndpointToPartitions
to consistently include both existing and new members' endpoint
information. The previous implementation did not incldue the endpoint
for a new member. Also return an empty list instead of null if the
set of endpoints actually becomes empty.
The changes ensure that endpoint information is correctly propagated to
all group members during rebalancing, including when new members join
with user endpoints. Tests updated to reflect the new behavior and a
comprehensive test added to verify endpoint information includes both
existing and new members.
Reviewers: Bill Bejeck<[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 32 ++--
.../coordinator/group/streams/StreamsGroup.java | 2 +-
.../topics/EndpointToPartitionsManager.java | 28 +++
.../group/GroupMetadataManagerTest.java | 190 +++++++++++++--------
4 files changed, 166 insertions(+), 86 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9f551a5d02c..81b4feb2f75 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2092,7 +2092,9 @@ public class GroupMetadataManager {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
- if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
+ if (updatedMember.userEndpoint().isPresent()) {
+ // If no user endpoint is defined, there is no change in the
endpoint information.
+ // Otherwise, bump the endpoint information epoch
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
}
}
@@ -2100,7 +2102,11 @@ public class GroupMetadataManager {
if (group.endpointInformationEpoch() != memberEndpointEpoch) {
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group,
updatedMember));
}
- response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+ if (groups.containsKey(group.groupId())) {
+ // If we just created the group, the endpoint information epoch
will not be persisted, so return epoch 0.
+ // Otherwise, return the bumped epoch.
+
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
+ }
Map<String, CreatableTopic> internalTopicsToBeCreated =
Collections.emptyMap();
if
(updatedConfiguredTopology.topicConfigurationException().isPresent()) {
@@ -2216,20 +2222,20 @@ public class GroupMetadataManager {
StreamsGroupMember updatedMember) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
final Map<String, StreamsGroupMember> members = group.members();
+ // Build endpoint information for all members except the updated member
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet())
{
- final String memberIdForAssignment = entry.getKey();
- final Optional<StreamsGroupMemberMetadataValue.Endpoint>
endpointOptional = members.get(memberIdForAssignment).userEndpoint();
- StreamsGroupMember groupMember = updatedMember != null &&
memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember :
members.get(memberIdForAssignment);
- if (endpointOptional.isPresent()) {
- final StreamsGroupMemberMetadataValue.Endpoint endpoint =
endpointOptional.get();
- final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
- responseEndpoint.setHost(endpoint.host());
- responseEndpoint.setPort(endpoint.port());
- StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint,
group, metadataImage);
- endpointToPartitionsList.add(endpointToPartitions);
+ if (updatedMember != null &&
entry.getKey().equals(updatedMember.memberId())) {
+ continue;
}
+
EndpointToPartitionsManager.maybeEndpointToPartitions(entry.getValue(), group,
metadataImage)
+ .ifPresent(endpointToPartitionsList::add);
+ }
+ // Always build endpoint information for the updated member (whether
new or existing)
+ if (updatedMember != null) {
+
EndpointToPartitionsManager.maybeEndpointToPartitions(updatedMember, group,
metadataImage)
+ .ifPresent(endpointToPartitionsList::add);
}
- return endpointToPartitionsList.isEmpty() ? null :
endpointToPartitionsList;
+ return endpointToPartitionsList;
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 192ddada4de..5b8fa258cb3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -208,7 +208,7 @@ public class StreamsGroup implements Group {
* The current epoch for endpoint information, this is used to determine
when to send
* updated endpoint information to members of the group.
*/
- private int endpointInformationEpoch = -1;
+ private int endpointInformationEpoch = 0;
/**
* The last used assignment configurations for this streams group.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
index 7b55f346f09..455a584da40 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
+import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
@@ -35,6 +36,33 @@ public class EndpointToPartitionsManager {
private EndpointToPartitionsManager() {
}
+ /**
+ * Creates endpoint-to-partitions mapping for a member if the member has a
user endpoint.
+ * Returns empty if the member has no user endpoint.
+ *
+ * @param streamsGroupMember The streams group member.
+ * @param streamsGroup The streams group.
+ * @param metadataImage The metadata image.
+ * @return An Optional containing the EndpointToPartitions if the member
has an endpoint, empty otherwise.
+ */
+ public static
Optional<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeEndpointToPartitions(
+ final StreamsGroupMember streamsGroupMember,
+ final StreamsGroup streamsGroup,
+ final CoordinatorMetadataImage metadataImage
+ ) {
+ Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional =
streamsGroupMember.userEndpoint();
+ if (endpointOptional.isEmpty()) {
+ return Optional.empty();
+ }
+
+ StreamsGroupMemberMetadataValue.Endpoint endpoint =
endpointOptional.get();
+ StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new
StreamsGroupHeartbeatResponseData.Endpoint();
+ responseEndpoint.setHost(endpoint.host());
+ responseEndpoint.setPort(endpoint.port());
+
+ return Optional.of(endpointToPartitions(streamsGroupMember,
responseEndpoint, streamsGroup, metadataImage));
+ }
+
public static StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint,
final StreamsGroup streamsGroup,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index cf80e19c4ff..6ac1587accd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -165,6 +165,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -16447,8 +16448,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
- .setStatusDetail("Source topics bar are missing.")))
- .setEndpointInformationEpoch(-1),
+ .setStatusDetail("Source topics bar are missing."))),
result.response().data()
);
@@ -16533,8 +16533,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: [bar]")))
- .setEndpointInformationEpoch(-1),
+ .setStatusDetail("Internal topics are missing: [bar]"))),
result.response().data()
);
@@ -16616,8 +16615,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
- .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]")))
- .setEndpointInformationEpoch(-1),
+ .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]"))),
result.response().data()
);
@@ -16714,8 +16712,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.STALE_TOPOLOGY.code())
- .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1.")))
- .setEndpointInformationEpoch(-1),
+ .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1."))),
result.response().data()
);
@@ -16809,8 +16806,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setEndpointInformationEpoch(-1),
+ )),
result1.response().data()
);
assertRecordsEquals(List.of(), result1.records());
@@ -16831,8 +16827,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setEndpointInformationEpoch(-1),
+ )),
result2.response().data()
);
@@ -16924,8 +16919,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setEndpointInformationEpoch(-1),
+ )),
result2.response().data()
);
}
@@ -17263,8 +17257,7 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(-1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17404,8 +17397,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(1)
- .setHeartbeatIntervalMs(5000)
- .setEndpointInformationEpoch(-1),
+ .setHeartbeatIntervalMs(5000),
result.response().data()
);
}
@@ -17513,8 +17505,7 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(-1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17555,8 +17546,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0))
))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(0),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17601,8 +17591,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(2))
))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17635,8 +17624,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId3)
.setMemberEpoch(11)
- .setHeartbeatIntervalMs(5000)
- .setEndpointInformationEpoch(1),
+ .setHeartbeatIntervalMs(5000),
result.response().data()
);
@@ -17675,8 +17663,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(11)
- .setHeartbeatIntervalMs(5000)
- .setEndpointInformationEpoch(1),
+ .setHeartbeatIntervalMs(5000),
result.response().data()
);
@@ -17705,8 +17692,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(10)
- .setHeartbeatIntervalMs(5000)
- .setEndpointInformationEpoch(1),
+ .setHeartbeatIntervalMs(5000),
result.response().data()
);
@@ -17732,8 +17718,7 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology2)
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(2),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17763,8 +17748,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId3)
.setMemberEpoch(11)
- .setHeartbeatIntervalMs(5000)
- .setEndpointInformationEpoch(2),
+ .setHeartbeatIntervalMs(5000),
result.response().data()
);
@@ -17806,8 +17790,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(2))
))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(3),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -17853,8 +17836,7 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology2)
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(4),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18192,8 +18174,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2, 3, 4, 5))
))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18454,8 +18435,7 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(-1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18483,8 +18463,7 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(0),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18616,8 +18595,7 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(-1),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18644,8 +18622,7 @@ public class GroupMetadataManagerTest {
.setSubtopologyId(subtopology1)
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setEndpointInformationEpoch(0),
+ .setWarmupTasks(List.of()),
result.response().data()
);
@@ -18810,6 +18787,11 @@ public class GroupMetadataManagerTest {
.setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
.setEndpointInformationEpoch(0));
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
expectedEndpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions()
+ .setUserEndpoint(new
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092))
+ .setActivePartitions(List.of(new
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0,
1))))
+ .setStandbyPartitions(List.of());
+
assertResponseEquals(
new StreamsGroupHeartbeatResponseData()
.setMemberId(memberId)
@@ -18821,46 +18803,110 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setPartitionsByUserEndpoint(null),
+
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)),
result.response().data()
);
+ result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
+ .setMemberEpoch(result.response().data().memberEpoch())
+
.setEndpointInformationEpoch(result.response().data().endpointInformationEpoch()));
+
assertNull(result.response().data().partitionsByUserEndpoint());
+ }
-
context.groupMetadataManager.streamsGroup(groupId).setEndpointInformationEpoch(1);
+ @Test
+ public void testStreamsGroupEndpointInformationIncludesNewMember() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
- StreamsGroupHeartbeatResponseData.EndpointToPartitions
expectedEndpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions()
- .setUserEndpoint(new
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(9092))
- .setActivePartitions(List.of(new
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("foo").setPartitions(List.of(0,
1))))
- .setStandbyPartitions(List.of());
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 4)
+ .buildCoordinatorMetadataImage())
+ .build();
- result = context.streamsGroupHeartbeat(
+ // Prepare assignment for first member
+ assignor.prepareGroupAssignment(
+ Map.of(memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ // First member joins
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
- .setMemberId(memberId)
- .setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
-
.setMemberEpoch(result.response().data().memberEpoch()));
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("host1").setPort(9092))
+ .setEndpointInformationEpoch(0));
- assertNotNull(result.response().data().partitionsByUserEndpoint());
- StreamsGroupHeartbeatResponseData.EndpointToPartitions
actualEndpointToPartitions =
result.response().data().partitionsByUserEndpoint().get(0);
- assertEquals(expectedEndpointToPartitions.userEndpoint(),
actualEndpointToPartitions.userEndpoint());
- StreamsGroupHeartbeatResponseData.TopicPartition
expectedEndpointTopicPartitions =
expectedEndpointToPartitions.activePartitions().get(0);
- StreamsGroupHeartbeatResponseData.TopicPartition
actualEndpointTopicPartitions =
actualEndpointToPartitions.activePartitions().get(0);
+ assertEquals(1, result.response().data().memberEpoch());
- assertEquals(expectedEndpointTopicPartitions.topic(),
actualEndpointTopicPartitions.topic());
- List<Integer> actualPartitions =
actualEndpointTopicPartitions.partitions();
- Collections.sort(actualPartitions);
- assertEquals(expectedEndpointTopicPartitions.partitions(),
actualPartitions);
+ // Prepare assignment for both members
+ assignor.prepareGroupAssignment(
+ Map.of(
+ memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)),
+ memberId2,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 2, 3))
+ ));
+ // Second member joins
result = context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
- .setMemberId(memberId)
- .setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("localhost").setPort(9092))
- .setMemberEpoch(result.response().data().memberEpoch())
-
.setEndpointInformationEpoch(result.response().data().endpointInformationEpoch()));
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setUserEndpoint(new
StreamsGroupHeartbeatRequestData.Endpoint().setHost("host2").setPort(9093)));
- assertNull(result.response().data().partitionsByUserEndpoint());
+ // The response should include endpoint information because the
member's epoch (0) differs from the group's (1)
+ assertNotNull(result.response().data().partitionsByUserEndpoint());
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointsList = result.response().data().partitionsByUserEndpoint();
+ assertEquals(2, endpointsList.size(), "Should include both members in
endpoint information");
+
+ // Sort by port for consistent ordering
+ endpointsList.sort(Comparator.comparingInt(e ->
e.userEndpoint().port()));
+
+ // Verify first member's endpoint
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions member1Endpoint
= endpointsList.get(0);
+ assertEquals("host1", member1Endpoint.userEndpoint().host());
+ assertEquals(9092, member1Endpoint.userEndpoint().port());
+ assertEquals(1, member1Endpoint.activePartitions().size());
+ StreamsGroupHeartbeatResponseData.TopicPartition member1Topic =
member1Endpoint.activePartitions().get(0);
+ assertEquals("foo", member1Topic.topic());
+ List<Integer> member1Partitions = new
ArrayList<>(member1Topic.partitions());
+ Collections.sort(member1Partitions);
+ assertEquals(List.of(0, 1), member1Partitions);
+
+ // Verify second member's endpoint (the new member)
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions member2Endpoint
= endpointsList.get(1);
+ assertEquals("host2", member2Endpoint.userEndpoint().host());
+ assertEquals(9093, member2Endpoint.userEndpoint().port());
+ assertEquals(1, member2Endpoint.activePartitions().size());
+ StreamsGroupHeartbeatResponseData.TopicPartition member2Topic =
member2Endpoint.activePartitions().get(0);
+ assertEquals("foo", member2Topic.topic());
+ List<Integer> member2Partitions = new
ArrayList<>(member2Topic.partitions());
+ Collections.sort(member2Partitions);
+ assertEquals(List.of(2, 3), member2Partitions);
}
@Test