This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 431cffc93f2 KAFKA-19135 Migrate initial IQ support for KIP-1071 from 
feature branch to trunk (#19588)
431cffc93f2 is described below

commit 431cffc93f2a9c1a178bd4caede1001d329ab10b
Author: Bill Bejeck <bbej...@apache.org>
AuthorDate: Tue Apr 29 20:08:49 2025 -0400

    KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to 
trunk (#19588)
    
    This PR is a migration of the initial IQ support for KIP-1071 from the
    feature branch to trunk.  It includes a parameterized integration test
    that expects the same results whether using either the classic or new
    streams group protocol.
    
    Note that this PR will deliver IQ information in each heartbeat
    response.  A follow-up PR will change that to be only sending IQ
    information when assignments change.
    
    Reviewers Lucas Brutschy <lucas...@apache.org>
---
 .../StreamsGroupHeartbeatRequestManager.java       |  22 +-
 .../consumer/internals/StreamsRebalanceData.java   |  32 ++-
 .../message/StreamsGroupHeartbeatResponse.json     |   6 +-
 .../StreamsGroupHeartbeatRequestManagerTest.java   |   8 +-
 .../coordinator/group/GroupMetadataManager.java    |  23 +-
 .../topics/EndpointToPartitionsManager.java        |  86 +++++++
 .../topics/EndpointToPartitionsManagerTest.java    | 162 +++++++++++++
 .../IQv2EndpointToPartitionsIntegrationTest.java   | 270 +++++++++++++++++++++
 .../streams/processor/internals/StreamThread.java  |  15 ++
 .../processor/internals/StreamThreadTest.java      |  14 +-
 .../org/apache/kafka/streams/utils/TestUtils.java  |   2 +-
 11 files changed, 620 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index b86c6d0498c..8f7d08e23a5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -674,16 +674,24 @@ public class StreamsGroupHeartbeatRequestManager 
implements RequestManager {
         membershipManager.transitionToFatal();
     }
 
-    private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> 
convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
-        Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> 
partitionsByHost = new HashMap<>();
+    private static Map<StreamsRebalanceData.HostInfo, 
StreamsRebalanceData.EndpointPartitions> convertHostInfoMap(
+            final StreamsGroupHeartbeatResponseData data) {
+        Map<StreamsRebalanceData.HostInfo, 
StreamsRebalanceData.EndpointPartitions> partitionsByHost = new HashMap<>();
         data.partitionsByUserEndpoint().forEach(endpoint -> {
-            List<TopicPartition> topicPartitions = 
endpoint.partitions().stream()
-                .flatMap(partition ->
-                    partition.partitions().stream().map(partitionId -> new 
TopicPartition(partition.topic(), partitionId)))
-                .collect(Collectors.toList());
+            List<TopicPartition> activeTopicPartitions = 
getTopicPartitionList(endpoint.activePartitions());
+            List<TopicPartition> standbyTopicPartitions = 
getTopicPartitionList(endpoint.standbyPartitions());
             StreamsGroupHeartbeatResponseData.Endpoint userEndpoint = 
endpoint.userEndpoint();
-            partitionsByHost.put(new 
StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), 
topicPartitions);
+            StreamsRebalanceData.EndpointPartitions endpointPartitions = new 
StreamsRebalanceData.EndpointPartitions(activeTopicPartitions, 
standbyTopicPartitions);
+            partitionsByHost.put(new 
StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), 
endpointPartitions);
         });
         return partitionsByHost;
     }
+
+    static List<TopicPartition> 
getTopicPartitionList(List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitions) {
+        return topicPartitions.stream()
+                .flatMap(partition ->
+                        partition.partitions().stream().map(partitionId -> new 
TopicPartition(partition.topic(), partitionId)))
+                .collect(Collectors.toList());
+    }
+
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 0158370a509..2fe7ae8ad35 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -119,6 +120,31 @@ public class StreamsRebalanceData {
         }
 
     }
+    public static class EndpointPartitions {
+        private final List<TopicPartition> activePartitions;
+        private final List<TopicPartition> standbyPartitions;
+
+        public EndpointPartitions(final List<TopicPartition> activePartitions,
+                                  final List<TopicPartition> 
standbyPartitions) {
+            this.activePartitions = activePartitions;
+            this.standbyPartitions = standbyPartitions;
+        }
+
+        public List<TopicPartition> activePartitions() {
+            return new ArrayList<>(activePartitions);
+        }
+
+        public List<TopicPartition> standbyPartitions() {
+            return new ArrayList<>(standbyPartitions);
+        }
+        @Override
+        public String toString() {
+            return "EndpointPartitions {"
+                    + "activePartitions=" + activePartitions
+                    + ", standbyPartitions=" + standbyPartitions
+                    + '}';
+        }
+    }
 
     public static class Assignment {
 
@@ -297,7 +323,7 @@ public class StreamsRebalanceData {
 
     private final AtomicReference<Assignment> reconciledAssignment = new 
AtomicReference<>(Assignment.EMPTY);
 
-    private final AtomicReference<Map<HostInfo, List<TopicPartition>>> 
partitionsByHost = new AtomicReference<>(Collections.emptyMap());
+    private final AtomicReference<Map<HostInfo, EndpointPartitions>> 
partitionsByHost = new AtomicReference<>(Collections.emptyMap());
 
     private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
 
@@ -341,11 +367,11 @@ public class StreamsRebalanceData {
         return reconciledAssignment.get();
     }
 
-    public void setPartitionsByHost(final Map<HostInfo, List<TopicPartition>> 
partitionsByHost) {
+    public void setPartitionsByHost(final Map<HostInfo, EndpointPartitions> 
partitionsByHost) {
         this.partitionsByHost.set(partitionsByHost);
     }
 
-    public Map<HostInfo, List<TopicPartition>> partitionsByHost() {
+    public Map<HostInfo, EndpointPartitions> partitionsByHost() {
         return partitionsByHost.get();
     }
 
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index a5f3a99f9de..7127fcd1282 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -70,8 +70,10 @@
       "fields": [
         { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
           "about": "User-defined endpoint to connect to the node" },
-        { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
-          "about": "All partitions available on the node" }
+        { "name": "ActivePartitions", "type": "[]TopicPartition", "versions": 
"0+",
+          "about": "All topic partitions materialized by active tasks on the 
node" },
+        { "name": "StandbyPartitions", "type": "[]TopicPartition", "versions": 
"0+",
+          "about": "All topic partitions materialized by standby tasks on the 
node" }
       ]
     }
   ],
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 5e0c6652ef6..9839f3b2210 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -153,7 +153,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
         List.of(
             new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
                 .setUserEndpoint(new 
StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080))
-                .setPartitions(List.of(
+                .setActivePartitions(List.of(
                     new 
StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0)))
                 )
         );
@@ -591,9 +591,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
                 .get(new StreamsRebalanceData.HostInfo(
                     ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
                     ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
-                );
-            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), 
topicPartitions.get(0).topic());
-            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
+                ).activePartitions();
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).topic(), 
topicPartitions.get(0).topic());
+            
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).partitions().get(0),
 topicPartitions.get(0).partition());
             assertEquals(
                 1.0,
                 metrics.metric(metrics.metricName("heartbeat-total", 
"consumer-coordinator-metrics")).metricValue()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ad4a89d90eb..fb2c7c4c6af 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -158,6 +158,7 @@ import 
org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import 
org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
 import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import 
org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
 import org.apache.kafka.image.MetadataDelta;
@@ -1982,7 +1983,8 @@ public class GroupMetadataManager {
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
+            
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
 
         // The assignment is only provided in the following cases:
         // 1. The member is joining.
@@ -2093,6 +2095,25 @@ public class GroupMetadataManager {
             .collect(Collectors.toList());
     }
 
+    private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeBuildEndpointToPartitions(StreamsGroup group) {
+        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
+        final Map<String, StreamsGroupMember> members = group.members();
+        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
+            final String memberIdForAssignment = entry.getKey();
+            final Optional<StreamsGroupMemberMetadataValue.Endpoint> 
endpointOptional = members.get(memberIdForAssignment).userEndpoint();
+            StreamsGroupMember groupMember = entry.getValue();
+            if (endpointOptional.isPresent()) {
+                final StreamsGroupMemberMetadataValue.Endpoint endpoint = 
endpointOptional.get();
+                final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
+                responseEndpoint.setHost(endpoint.host());
+                responseEndpoint.setPort(endpoint.port());
+                StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = 
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, 
group);
+                endpointToPartitionsList.add(endpointToPartitions);
+            }
+        }
+        return endpointToPartitionsList.isEmpty() ? null : 
endpointToPartitionsList;
+    }
+
     /**
      * Handles a regular heartbeat from a consumer group member. It mainly 
consists of
      * three parts:
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
new file mode 100644
index 00000000000..ea3eca20935
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EndpointToPartitionsManager {
+
+    private EndpointToPartitionsManager() {
+    }
+
+    public static StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
+                                                                               
               final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint,
+                                                                               
               final StreamsGroup streamsGroup) {
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = new 
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+        Map<String, Set<Integer>> activeTasks = 
streamsGroupMember.assignedTasks().activeTasks();
+        Map<String, Set<Integer>> standbyTasks = 
streamsGroupMember.assignedTasks().standbyTasks();
+        endpointToPartitions.setUserEndpoint(responseEndpoint);
+        Map<String, ConfiguredSubtopology> configuredSubtopologies = 
streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, 
streamsGroup.partitionMetadata());
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, 
streamsGroup.partitionMetadata());
+        endpointToPartitions.setActivePartitions(activeTopicPartitions);
+        endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
+        return endpointToPartitions;
+    }
+
+    private static List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitions(final Map<String, Set<Integer>> tasks,
+                                                                               
           final Map<String, ConfiguredSubtopology> configuredSubtopologies,
+                                                                               
           final Map<String, TopicMetadata> groupTopicMetadata) {
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionsForTasks = new ArrayList<>();
+        for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
+            String subtopologyId = taskEntry.getKey();
+            ConfiguredSubtopology configuredSubtopology = 
configuredSubtopologies.get(subtopologyId);
+            Set<String> sourceTopics = configuredSubtopology.sourceTopics();
+            Set<String> repartitionSourceTopics = 
configuredSubtopology.repartitionSourceTopics().keySet();
+            Set<String> allSourceTopic = new HashSet<>(sourceTopics);
+            allSourceTopic.addAll(repartitionSourceTopics);
+            List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), 
allSourceTopic, groupTopicMetadata);
+            topicPartitionsForTasks.addAll(topicPartitionList);
+        }
+        return topicPartitionsForTasks;
+    }
+
+    private static List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionListForTask(final Set<Integer> taskSet,
+                                                                               
                     final Set<String> topicNames,
+                                                                               
                     final Map<String, TopicMetadata> groupTopicMetadata) {
+        return topicNames.stream().map(topic -> {
+            int numPartitionsForTopic = 
groupTopicMetadata.get(topic).numPartitions();
+            StreamsGroupHeartbeatResponseData.TopicPartition tp = new 
StreamsGroupHeartbeatResponseData.TopicPartition();
+            tp.setTopic(topic);
+            List<Integer> tpPartitions = new ArrayList<>(taskSet);
+            if (numPartitionsForTopic < taskSet.size()) {
+                Collections.sort(tpPartitions);
+                tp.setPartitions(tpPartitions.subList(0, 
numPartitionsForTopic));
+            } else {
+                tp.setPartitions(tpPartitions);
+            }
+            return tp;
+        }).toList();
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
new file mode 100644
index 00000000000..2002774b60b
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManagerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TasksTuple;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class EndpointToPartitionsManagerTest {
+
+    private StreamsGroup streamsGroup;
+    private StreamsGroupMember streamsGroupMember;
+    private ConfiguredTopology configuredTopology;
+    private ConfiguredSubtopology configuredSubtopologyOne;
+    private ConfiguredSubtopology configuredSubtopologyTwo;
+    private final Map<String, Set<Integer>> activeTasks = new HashMap<>();
+    private final Map<String, Set<Integer>> standbyTasks = new HashMap<>();
+    private TasksTuple tasksTuple;
+    private final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint 
= new StreamsGroupHeartbeatResponseData.Endpoint();
+
+    @BeforeEach
+    public void setUp() {
+        streamsGroup = mock(StreamsGroup.class);
+        streamsGroupMember = mock(StreamsGroupMember.class);
+        configuredTopology = mock(ConfiguredTopology.class);
+        configuredSubtopologyOne = new 
ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new 
HashMap<>());
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = 
Map.of("Topic-B",  new ConfiguredInternalTopic("Topic-B", 1, 
Optional.of((short) 1), Collections.emptyMap()));
+        configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(), 
repartitionSourceTopics, new HashSet<>(), new HashMap<>());
+        SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = 
new TreeMap<>();
+        configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
+        SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap = 
new TreeMap<>();
+        configuredSubtopologyOneMap.put("1", configuredSubtopologyTwo);
+        
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
+        
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyTwoMap));
+        responseEndpoint.setHost("localhost");
+        responseEndpoint.setPort(9092);
+    }
+
+    @Test
+    void testEndpointToPartitionsWithStandbyTaskAssignments() {
+        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), 
"Topic-A", 3));
+        topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), 
"Topic-B", 3));
+
+        activeTasks.put("0", Set.of(0, 1, 2));
+        standbyTasks.put("1", Set.of(0, 1, 2));
+        tasksTuple = new TasksTuple(activeTasks, standbyTasks, 
Collections.emptyMap());
+        when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
+        
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
+        when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
+        
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
+        SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap = 
new TreeMap<>();
+        configuredSubtopologyMap.put("0", configuredSubtopologyOne);
+        configuredSubtopologyMap.put("1", configuredSubtopologyTwo);
+        
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
+
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
+                
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup);
+
+        assertEquals(responseEndpoint, result.userEndpoint());
+        assertEquals(1, result.activePartitions().size());
+        assertEquals(1, result.standbyPartitions().size());
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
activePartitions = result.activePartitions();
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> 
standbyPartitions = result.standbyPartitions();
+        
activePartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+        
standbyPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+        assertTopicPartitionsAssigned(activePartitions, "Topic-A");
+        assertTopicPartitionsAssigned(standbyPartitions, "Topic-B");
+    }
+
+    private static void 
assertTopicPartitionsAssigned(List<StreamsGroupHeartbeatResponseData.TopicPartition>
 topicPartitions, String topicName) {
+        StreamsGroupHeartbeatResponseData.TopicPartition topicPartition = 
topicPartitions.stream().filter(tp -> 
tp.topic().equals(topicName)).findFirst().get();
+        assertEquals(topicName, topicPartition.topic());
+        assertEquals(List.of(0, 1, 2), 
topicPartition.partitions().stream().sorted().toList());
+    }
+
+    @ParameterizedTest(name = "{4}")
+    @MethodSource("argsProvider")
+    void testEndpointToPartitionsWithTwoTopicsAndDifferentPartitions(int 
topicAPartitions,
+                                                                     int 
topicBPartitions,
+                                                                     
List<Integer> topicAExpectedPartitions,
+                                                                     
List<Integer> topicBExpectedPartitions,
+                                                                     String 
testName
+                                                                     ) {
+        Map<String, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), 
"Topic-A", topicAPartitions));
+        topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), 
"Topic-B", topicBPartitions));
+        configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", 
"Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
+
+        activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
+        when(streamsGroupMember.assignedTasks()).thenReturn(new 
TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
+        when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
+        
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
+        SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = 
new TreeMap<>();
+        configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
+        
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
+
+        StreamsGroupHeartbeatResponseData.EndpointToPartitions result = 
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, 
responseEndpoint, streamsGroup);
+
+        assertEquals(responseEndpoint, result.userEndpoint());
+        assertEquals(2, result.activePartitions().size());
+
+        List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions 
= result.activePartitions();
+        
topicPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
+
+        StreamsGroupHeartbeatResponseData.TopicPartition topicAPartition = 
result.activePartitions().get(0);
+        assertEquals("Topic-A", topicAPartition.topic());
+        assertEquals(topicAExpectedPartitions, 
topicAPartition.partitions().stream().sorted().toList());
+        
+        StreamsGroupHeartbeatResponseData.TopicPartition topicBPartition = 
result.activePartitions().get(1);
+        assertEquals("Topic-B", topicBPartition.topic());
+        assertEquals(topicBExpectedPartitions, 
topicBPartition.partitions().stream().sorted().toList());
+    }
+
+    static Stream<Arguments> argsProvider() {
+        return Stream.of(
+                arguments(2, 5, List.of(0, 1), List.of(0, 1, 2, 3, 4), "Should 
assign correct partitions when partitions differ between topics"),
+                arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should 
assign correct partitions when partitions same between topics")
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
new file mode 100644
index 00000000000..7cad5998701
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@Timeout(600)
+@Tag("integration")
+public class IQv2EndpointToPartitionsIntegrationTest {
+    private String appId;
+    private String inputTopicTwoPartitions;
+    private String outputTopicTwoPartitions;
+    private Properties streamsApplicationProperties = new Properties();
+    private Properties streamsSecondApplicationProperties = new Properties();
+
+    private static EmbeddedKafkaCluster cluster;
+    private static final int NUM_BROKERS = 3;
+    private static final String EXPECTED_STORE_NAME = "IQTest-count";
+
+    public void startCluster(final int standbyConfig) throws IOException {
+        final Properties properties = new Properties();
+        
properties.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
 standbyConfig);
+        cluster = 
EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS, properties);
+        cluster.start();
+    }
+
+    public void setUp() throws InterruptedException {
+        appId = safeUniqueTestName("endpointIntegrationTest");
+        inputTopicTwoPartitions = appId + "-input-two";
+        outputTopicTwoPartitions = appId + "-output-two";
+        cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+        cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+    }
+
+    public void closeCluster() {
+        cluster.stop();
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        
IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties);
+        if (!streamsSecondApplicationProperties.isEmpty()) {
+            
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
+        }
+    }
+
+    @ParameterizedTest(name = "{3}")
+    @MethodSource("groupProtocolParameters")
+    public void shouldGetCorrectHostPartitionInformation(final String 
groupProtocolConfig,
+                                                         final boolean 
usingStandbyReplicas,
+                                                         final int 
numStandbyReplicas,
+                                                         final String 
testName) throws Exception {
+        try {
+            startCluster(usingStandbyReplicas ? numStandbyReplicas : 0);
+            setUp();
+
+            final Properties streamOneProperties = new Properties();
+            streamOneProperties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath() + "-ks1");
+            streamOneProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + 
"-ks1");
+            streamOneProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
"localhost:2020");
+            streamOneProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocolConfig);
+            if (usingStandbyReplicas) {
+                
streamOneProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
numStandbyReplicas);
+            }
+            streamsApplicationProperties = props(streamOneProperties);
+
+            final Properties streamTwoProperties = new Properties();
+            streamTwoProperties.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath() + "-ks2");
+            streamTwoProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + 
"-ks2");
+            streamTwoProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
"localhost:3030");
+            streamTwoProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocolConfig);
+            if (usingStandbyReplicas) {
+                
streamTwoProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
numStandbyReplicas);
+            }
+            streamsSecondApplicationProperties = props(streamTwoProperties);
+
+            final Topology topology = complexTopology();
+            try (final KafkaStreams streamsOne = new KafkaStreams(topology, 
streamsApplicationProperties)) {
+                
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
+                waitForCondition(() -> 
!streamsOne.metadataForAllStreamsClients().isEmpty(),
+                        IntegrationTestUtils.DEFAULT_TIMEOUT,
+                        () -> "Kafka Streams didn't get metadata about the 
client.");
+                waitForCondition(() -> 
streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size()
 == 4,
+                        IntegrationTestUtils.DEFAULT_TIMEOUT,
+                        () -> "Kafka Streams one didn't get 4 tasks");
+                final List<StreamsMetadata> streamsMetadataAllClients = new 
ArrayList<>(streamsOne.metadataForAllStreamsClients());
+                assertEquals(1, streamsMetadataAllClients.size());
+                final StreamsMetadata streamsOneInitialMetadata = 
streamsMetadataAllClients.get(0);
+                assertEquals(2020, 
streamsOneInitialMetadata.hostInfo().port());
+                final Set<TopicPartition> topicPartitions = 
streamsOneInitialMetadata.topicPartitions();
+                assertEquals(4, topicPartitions.size());
+                assertEquals(0, 
streamsOneInitialMetadata.standbyTopicPartitions().size());
+
+                final long repartitionTopicTaskCount = 
topicPartitions.stream().filter(tp -> 
tp.topic().contains("-repartition")).count();
+                final long sourceTopicTaskCount = 
topicPartitions.stream().filter(tp -> 
tp.topic().contains("-input-two")).count();
+                assertEquals(2, repartitionTopicTaskCount);
+                assertEquals(2, sourceTopicTaskCount);
+                final int expectedStandbyCount = usingStandbyReplicas ? 1 : 0;
+
+                try (final KafkaStreams streamsTwo = new 
KafkaStreams(topology, streamsSecondApplicationProperties)) {
+                    streamsTwo.start();
+                    waitForCondition(() -> KafkaStreams.State.RUNNING == 
streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
+                            IntegrationTestUtils.DEFAULT_TIMEOUT,
+                            () -> "Kafka Streams one or two never transitioned 
to a RUNNING state.");
+
+                    waitForCondition(() ->  {
+                        final ThreadMetadata threadMetadata = 
streamsOne.metadataForLocalThreads().iterator().next();
+                        return threadMetadata.activeTasks().size() == 2 && 
threadMetadata.standbyTasks().size() == expectedStandbyCount;
+                    }, TestUtils.DEFAULT_MAX_WAIT_MS,
+                            "KafkaStreams one never released active tasks and 
received standby task");
+
+                    waitForCondition(() -> {
+                        final ThreadMetadata threadMetadata = 
streamsTwo.metadataForLocalThreads().iterator().next();
+                        return threadMetadata.activeTasks().size() == 2 && 
threadMetadata.standbyTasks().size() == expectedStandbyCount;
+                    }, TestUtils.DEFAULT_MAX_WAIT_MS,
+                            "KafkaStreams two never received active tasks and 
standby");
+
+                    waitForCondition(() -> {
+                        final List<StreamsMetadata> metadata = new 
ArrayList<>(streamsTwo.metadataForAllStreamsClients());
+                        return metadata.size() == 2 &&
+                               metadata.get(0).standbyTopicPartitions().size() 
== expectedStandbyCount &&
+                               metadata.get(1).standbyTopicPartitions().size() 
== expectedStandbyCount;
+                    }, TestUtils.DEFAULT_MAX_WAIT_MS,
+                            "Kafka Streams clients 1 and 2 never got metadata 
about standby tasks");
+
+                    waitForCondition(() -> 
streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size()
 == 2,
+                            IntegrationTestUtils.DEFAULT_TIMEOUT,
+                            () -> "Kafka Streams one didn't give up active 
tasks");
+
+                    final List<StreamsMetadata> allClientMetadataUpdated = new 
ArrayList<>(streamsTwo.metadataForAllStreamsClients());
+
+                    final StreamsMetadata streamsOneMetadata = 
allClientMetadataUpdated.get(0);
+                    final Set<TopicPartition> streamsOneActiveTopicPartitions 
= streamsOneMetadata.topicPartitions();
+                    final Set<TopicPartition> streamsOneStandbyTopicPartitions 
= streamsOneMetadata.standbyTopicPartitions();
+                    final Set<String> streamsOneStoreNames = 
streamsOneMetadata.stateStoreNames();
+                    final Set<String> streamsOneStandbyStoreNames = 
streamsOneMetadata.standbyStateStoreNames();
+
+                    assertEquals(2020, streamsOneMetadata.hostInfo().port());
+                    assertEquals(2, streamsOneActiveTopicPartitions.size());
+                    assertEquals(expectedStandbyCount, 
streamsOneStandbyTopicPartitions.size());
+                    assertEquals(1, streamsOneStoreNames.size());
+                    assertEquals(expectedStandbyCount, 
streamsOneStandbyStoreNames.size());
+                    assertEquals(EXPECTED_STORE_NAME, 
streamsOneStoreNames.iterator().next());
+                    if (usingStandbyReplicas) {
+                        assertEquals(EXPECTED_STORE_NAME, 
streamsOneStandbyStoreNames.iterator().next());
+                    }
+
+                    final long streamsOneRepartitionTopicCount = 
streamsOneActiveTopicPartitions.stream().filter(tp -> 
tp.topic().contains("-repartition")).count();
+                    final long streamsOneSourceTopicCount = 
streamsOneActiveTopicPartitions.stream().filter(tp -> 
tp.topic().contains("-input-two")).count();
+                    assertEquals(1, streamsOneRepartitionTopicCount);
+                    assertEquals(1, streamsOneSourceTopicCount);
+
+                    final StreamsMetadata streamsTwoMetadata = 
allClientMetadataUpdated.get(1);
+                    final Set<TopicPartition> streamsTwoActiveTopicPartitions 
= streamsTwoMetadata.topicPartitions();
+                    final Set<TopicPartition> streamsTwoStandbyTopicPartitions 
= streamsTwoMetadata.standbyTopicPartitions();
+                    final Set<String> streamsTwoStateStoreNames = 
streamsTwoMetadata.stateStoreNames();
+                    final Set<String> streamsTwoStandbyStateStoreNames = 
streamsTwoMetadata.standbyStateStoreNames();
+
+                    assertEquals(3030, streamsTwoMetadata.hostInfo().port());
+                    assertEquals(2, streamsTwoActiveTopicPartitions.size());
+                    assertEquals(expectedStandbyCount, 
streamsTwoStandbyTopicPartitions.size());
+                    assertEquals(1, streamsTwoStateStoreNames.size());
+                    assertEquals(expectedStandbyCount, 
streamsTwoStandbyStateStoreNames.size());
+                    assertEquals(EXPECTED_STORE_NAME, 
streamsTwoStateStoreNames.iterator().next());
+                    if (usingStandbyReplicas) {
+                        assertEquals(EXPECTED_STORE_NAME, 
streamsTwoStandbyStateStoreNames.iterator().next());
+                    }
+
+                    final long streamsTwoRepartitionTopicCount = 
streamsTwoActiveTopicPartitions.stream().filter(tp -> 
tp.topic().contains("-repartition")).count();
+                    final long streamsTwoSourceTopicCount = 
streamsTwoActiveTopicPartitions.stream().filter(tp -> 
tp.topic().contains("-input-two")).count();
+                    assertEquals(1, streamsTwoRepartitionTopicCount);
+                    assertEquals(1, streamsTwoSourceTopicCount);
+
+                    if (usingStandbyReplicas) {
+                        final TopicPartition streamsOneStandbyTopicPartition = 
streamsOneStandbyTopicPartitions.iterator().next();
+                        final TopicPartition streamsTwoStandbyTopicPartition = 
streamsTwoStandbyTopicPartitions.iterator().next();
+                        final String streamsOneStandbyTopicName = 
streamsOneStandbyTopicPartition.topic();
+                        final String streamsTwoStandbyTopicName = 
streamsTwoStandbyTopicPartition.topic();
+                        assertEquals(streamsOneStandbyTopicName, 
streamsTwoStandbyTopicName);
+                        
assertNotEquals(streamsOneStandbyTopicPartition.partition(), 
streamsTwoStandbyTopicPartition.partition());
+                    }
+                }
+            }
+        } finally {
+            closeCluster();
+        }
+    }
+
+    private static Stream<Arguments> groupProtocolParameters() {
+        return Stream.of(Arguments.of("streams", false, 0, "STREAMS protocol 
No standby"),
+                Arguments.of("classic", false, 0, "CLASSIC protocol No 
standby"),
+                Arguments.of("streams", true, 1, "STREAMS protocol With 
standby"),
+                Arguments.of("classic", true, 1, "CLASSIC protocol With 
standby"));
+    }
+
+    private Properties props(final Properties extraProperties) {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory(appId).getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.putAll(extraProperties);
+        return streamsConfiguration;
+    }
+
+    private Topology complexTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), 
Serdes.String()))
+                .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
+                .groupBy((key, value) -> value, Grouped.as("IQTest"))
+                .count(Materialized.as(EXPECTED_STORE_NAME))
+                .toStream().to(outputTopicTwoPartitions, 
Produced.with(Serdes.String(), Serdes.Long()));
+        return builder.build();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index d5f67ff4fad..dcfca3ec2f5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1494,6 +1494,21 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                     shutdownErrorHook.run();
                 }
             }
+
+            final Map<StreamsRebalanceData.HostInfo, 
StreamsRebalanceData.EndpointPartitions> partitionsByEndpoint =
+                    streamsRebalanceData.get().partitionsByHost();
+            final Map<HostInfo, Set<TopicPartition>> activeHostInfoMap = new 
HashMap<>();
+            final Map<HostInfo, Set<TopicPartition>> standbyHostInfoMap = new 
HashMap<>();
+
+            partitionsByEndpoint.forEach((hostInfo, endpointPartitions) -> {
+                activeHostInfoMap.put(new HostInfo(hostInfo.host(), 
hostInfo.port()), new HashSet<>(endpointPartitions.activePartitions()));
+                standbyHostInfoMap.put(new HostInfo(hostInfo.host(), 
hostInfo.port()), new HashSet<>(endpointPartitions.standbyPartitions()));
+            });
+            streamsMetadataState.onChange(
+                    activeHostInfoMap,
+                    standbyHostInfoMap,
+                    getTopicPartitionInfo(activeHostInfoMap)
+            );
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8cb2fc8cdfe..96090aa32fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -3805,6 +3805,11 @@ public class StreamThreadTest {
         final Runnable shutdownErrorHook = mock(Runnable.class);
 
         final Properties props = configProps(false, false, false);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+                new TopologyMetadata(internalTopologyBuilder, new 
StreamsConfig(props)),
+                StreamsMetadataState.UNKNOWN_HOST,
+                new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
         final StreamsConfig config = new StreamsConfig(props);
         thread = new StreamThread(
             new MockTime(1),
@@ -3828,7 +3833,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
-            null
+            streamsMetadataState
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
@@ -3860,6 +3865,11 @@ public class StreamThreadTest {
         final Properties props = configProps(false, false, false);
         final Runnable shutdownErrorHook = mock(Runnable.class);
         final StreamsConfig config = new StreamsConfig(props);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+                new TopologyMetadata(internalTopologyBuilder, config),
+                StreamsMetadataState.UNKNOWN_HOST,
+                new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
         thread = new StreamThread(
             new MockTime(1),
             config,
@@ -3882,7 +3892,7 @@ public class StreamThreadTest {
             HANDLER,
             null,
             Optional.of(streamsRebalanceData),
-            null
+            streamsMetadataState
         ).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         thread.setState(State.STARTING);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java 
b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
index 96d19dbb47e..2e13e3e6d9b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java
@@ -90,7 +90,7 @@ public class TestUtils {
         return safeUniqueTestName(methodName);
     }
 
-    private static String safeUniqueTestName(final String testName) {
+    public static String safeUniqueTestName(final String testName) {
         return sanitize(testName + Uuid.randomUuid().toString());
     }
 

Reply via email to