This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b2e6b80df6e78287cf0dab58b16ee7f77bf836a3
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon May 27 23:01:53 2024 +0200

    Resolve conflict from 11/25 trunk rebase - Create and inject 
StreamsHeartbeatRequestManager
    
    Find a way to inject the StreamsHeartbeatRequestManager into 
AsyncKafkaConsumer from Kafka Streams
    
    Also trigger assignment logic in Kafka Streams
    
    See https://github.com/lucasbru/kafka/pull/12
---
 .../consumer/internals/RequestManagers.java        |  11 +-
 .../internals/StreamsAssignmentInterface.java      | 138 +++--
 .../internals/StreamsHeartbeatRequestManager.java  | 579 ++++++++++++++++++++-
 .../StreamsInstallAssignmentRequestManager.java    |   2 +-
 .../StreamsPrepareAssignmentRequestManager.java    |   2 +-
 ...ava => StreamsGroupUninitializedException.java} |   4 +-
 ...java => StreamsInvalidAssignmentException.java} |   4 +-
 .../org/apache/kafka/common/protocol/Errors.java   |  10 +-
 .../common/requests/StreamsHeartbeatResponse.java  |  14 +-
 .../common/requests/StreamsInitializeRequest.java  |  17 +-
 .../common/requests/StreamsInitializeResponse.java |  16 +
 .../requests/StreamsInstallAssignmentRequest.java  |  18 +-
 .../requests/StreamsInstallAssignmentResponse.java |  19 +-
 .../requests/StreamsPrepareAssignmentRequest.java  |  18 +-
 .../requests/StreamsPrepareAssignmentResponse.java |  19 +-
 .../common/message/StreamsHeartbeatRequest.json    |   6 +-
 .../common/message/StreamsHeartbeatResponse.json   |  14 +-
 .../StreamsHeartbeatRequestManagerTest.java        | 408 +++++++++++++++
 .../internals/DefaultKafkaClientSupplier.java      |   6 +-
 .../streams/processor/internals/StreamThread.java  | 150 +++++-
 .../internals/StreamsRebalanceListener.java        |   3 +
 .../processor/internals/StreamThreadTest.java      |  50 +-
 22 files changed, 1390 insertions(+), 118 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 65dec12140b..f370cf15c67 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -262,9 +262,18 @@ public class RequestManagers implements Closeable {
                         streamsInstallAssignmentRequestManager = new 
StreamsInstallAssignmentRequestManager(
                             streamsInstanceMetadata.get());
                         streamsHeartbeatRequestManager = new 
StreamsHeartbeatRequestManager(
+                            logContext,
+                            time,
+                            config,
+                            coordinator,
                             streamsInitializeRequestManager,
                             streamsPrepareAssignmentRequestManager,
-                            streamsInstanceMetadata.get());
+                            membershipManager,
+                            backgroundEventHandler,
+                            metrics,
+                            streamsInstanceMetadata.get(),
+                            metadata
+                            );
                     } else {
                         heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index acc7e314362..d09ffbda4f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -16,28 +16,33 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.TopicPartition;
+
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Defines a self-contained object to exchange assignment-related metadata 
with the Kafka Streams instance.
- *
+ * <p>
  * It's used to exchange information between the streams module and the 
clients module, and should be mostly self-contained
  */
 public class StreamsAssignmentInterface {
 
     private UUID processID;
 
-    private String userEndPointHost;
+    private HostInfo endPoint;
 
-    private int userEndPointPort;
+    private String assignor;
 
-    private Map<String, SubTopology> subtopologyMap;
+    private Map<String, Subtopology> subtopologyMap;
 
     private Map<String, Object> assignmentConfiguration;
 
@@ -51,15 +56,15 @@ public class StreamsAssignmentInterface {
         return processID;
     }
 
-    public String userEndPointHost() {
-        return userEndPointHost;
+    public HostInfo endPoint() {
+        return endPoint;
     }
 
-    public int userEndPointPort() {
-        return userEndPointPort;
+    public String assignor() {
+        return assignor;
     }
 
-    public Map<String, SubTopology> subtopologyMap() {
+    public Map<String, Subtopology> subtopologyMap() {
         return subtopologyMap;
     }
 
@@ -67,6 +72,7 @@ public class StreamsAssignmentInterface {
         return assignmentConfiguration;
     }
 
+    // TODO: This needs to be used somewhere
     public Map<TaskId, Long> taskLags() {
         return taskLags;
     }
@@ -84,19 +90,45 @@ public class StreamsAssignmentInterface {
         shutdownRequested.set(true);
     }
 
+    // TODO: This needs to be checked somewhere.
     public boolean shutdownRequested() {
         return shutdownRequested.get();
     }
 
+    // TODO: This needs to be called somewhere
     public void setTaskLags(Map<TaskId, Long> taskLags) {
         this.taskLags = taskLags;
     }
 
-    // TODO: Reconciled assignment updated by the stream thread
-    public final Assignment reconciledAssignment = new Assignment();
+    public final AtomicReference<Assignment> reconciledAssignment = new 
AtomicReference<>();
+
+    public final AtomicReference<Assignment> targetAssignment = new 
AtomicReference<>();
+
+    /**
+     * List of partitions available on each host. Updated by the streams 
protocol client.
+     */
+    public final AtomicReference<Map<HostInfo, List<TopicPartition>>> 
partitionsByHost = new AtomicReference<>();
+
+    public static class HostInfo {
+
+        public final String host;
+
+        public final int port;
+
+        public HostInfo(final String host, final int port) {
+            this.host = host;
+            this.port = port;
+        }
+
+        @Override
+        public String toString() {
+            return "HostInfo{" +
+                "host='" + host + '\'' +
+                ", port=" + port +
+                '}';
+        }
 
-    // TODO: Target assignment read by the stream thread
-    public final Assignment targetAssignment = new Assignment();
+    }
 
     public static class Assignment {
 
@@ -106,6 +138,37 @@ public class StreamsAssignmentInterface {
 
         public final Set<TaskId> warmupTasks = new HashSet<>();
 
+        public Assignment() {
+        }
+
+        public Assignment(final Set<TaskId> activeTasks, final Set<TaskId> 
standbyTasks, final Set<TaskId> warmupTasks) {
+            this.activeTasks.addAll(activeTasks);
+            this.standbyTasks.addAll(standbyTasks);
+            this.warmupTasks.addAll(warmupTasks);
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final Assignment that = (Assignment) o;
+            return Objects.equals(activeTasks, that.activeTasks)
+                && Objects.equals(standbyTasks, that.standbyTasks)
+                && Objects.equals(warmupTasks, that.warmupTasks);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(activeTasks, standbyTasks, warmupTasks);
+        }
+
+        public Assignment copy() {
+            return new Assignment(activeTasks, standbyTasks, warmupTasks);
+        }
     }
 
     public static class TopicInfo {
@@ -130,53 +193,54 @@ public class StreamsAssignmentInterface {
     }
 
     public static class TaskId {
+
         public final String subtopologyId;
-        public final int taskId;
+        public final int partitionId;
 
-        public int taskId() {
-            return taskId;
+        public int partitionId() {
+            return partitionId;
         }
 
         public String subtopologyId() {
             return subtopologyId;
         }
 
-        public TaskId(final String subtopologyId, final int taskId) {
+        public TaskId(final String subtopologyId, final int partitionId) {
             this.subtopologyId = subtopologyId;
-            this.taskId = taskId;
+            this.partitionId = partitionId;
         }
 
         @Override
         public String toString() {
             return "TaskId{" +
                 "subtopologyId=" + subtopologyId +
-                ", taskId=" + taskId +
+                ", partitionId=" + partitionId +
                 '}';
         }
     }
 
-    public static class SubTopology {
+    public static class Subtopology {
 
-        public final Set<String> sinkTopics;
         public final Set<String> sourceTopics;
+        public final Set<String> sinkTopics;
         public final Map<String, TopicInfo> stateChangelogTopics;
         public final Map<String, TopicInfo> repartitionSourceTopics;
 
-       public SubTopology(final Set<String> sinkTopics,
-                          final Set<String> sourceTopics,
-                          final Map<String, TopicInfo> repartitionSourceTopics,
-                          final Map<String, TopicInfo> stateChangelogTopics) {
-            this.sinkTopics = sinkTopics;
+        public Subtopology(final Set<String> sourceTopics,
+                           final Set<String> sinkTopics,
+                           final Map<String, TopicInfo> 
repartitionSourceTopics,
+                           final Map<String, TopicInfo> stateChangelogTopics) {
             this.sourceTopics = sourceTopics;
+            this.sinkTopics = sinkTopics;
             this.stateChangelogTopics = stateChangelogTopics;
             this.repartitionSourceTopics = repartitionSourceTopics;
         }
 
         @Override
         public String toString() {
-            return "SubTopology{" +
-                "sinkTopics=" + sinkTopics +
-                ", sourceTopics=" + sourceTopics +
+            return "Subtopology{" +
+                "sourceTopics=" + sourceTopics +
+                ", sinkTopics=" + sinkTopics +
                 ", stateChangelogTopics=" + stateChangelogTopics +
                 ", repartitionSourceTopics=" + repartitionSourceTopics +
                 '}';
@@ -184,15 +248,15 @@ public class StreamsAssignmentInterface {
     }
 
     public StreamsAssignmentInterface(UUID processID,
-                                      String userEndPointHost,
-                                      int userEndPointPort,
-                                      Map<String, SubTopology> subtopologyMap,
+                                      HostInfo endPoint,
+                                      String assignor,
+                                      Map<String, Subtopology> subtopologyMap,
                                       Map<String, Object> 
assignmentConfiguration,
                                       Map<String, String> clientTags
-                                      ) {
+    ) {
         this.processID = processID;
-        this.userEndPointHost = userEndPointHost;
-        this.userEndPointPort = userEndPointPort;
+        this.endPoint = endPoint;
+        this.assignor = assignor;
         this.subtopologyMap = subtopologyMap;
         this.assignmentConfiguration = assignmentConfiguration;
         this.taskLags = new HashMap<>();
@@ -204,8 +268,8 @@ public class StreamsAssignmentInterface {
     public String toString() {
         return "StreamsAssignmentMetadata{" +
             "processID=" + processID +
-            ", userEndPointHost='" + userEndPointHost + '\'' +
-            ", userEndPointPort=" + userEndPointPort +
+            ", endPoint='" + endPoint + '\'' +
+            ", assignor='" + assignor + '\'' +
             ", subtopologyMap=" + subtopologyMap +
             ", assignmentConfiguration=" + assignmentConfiguration +
             ", taskLags=" + taskLags +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
index 685ab28da69..ffa6bb2657f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java
@@ -18,15 +18,16 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
 import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
 import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
 import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId;
 import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo;
@@ -42,27 +43,599 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class StreamsHeartbeatRequestManager implements RequestManager {
 
+    private final Logger logger;
+
+    private final CoordinatorRequestManager coordinatorRequestManager;
+
+    private final StreamsHeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
+
+    private final StreamsHeartbeatRequestManager.HeartbeatState heartbeatState;
+
+    private final MembershipManager membershipManager;
+
+    private final StreamsPrepareAssignmentRequestManager 
streamsPrepareAssignmentRequestManager;
+
+    private final StreamsInitializeRequestManager 
streamsInitializeRequestManager;
+
+    private final BackgroundEventHandler backgroundEventHandler;
+
+    private final Timer pollTimer;
+
+    private final HeartbeatMetricsManager metricsManager;
+
     private StreamsAssignmentInterface streamsInterface;
 
+    private final Map<String, Uuid> assignedTopicIdCache;
+
+    private final ConsumerMetadata metadata;
+
     public StreamsHeartbeatRequestManager(
+        final LogContext logContext,
+        final Time time,
+        final ConsumerConfig config,
+        final CoordinatorRequestManager coordinatorRequestManager,
         final StreamsInitializeRequestManager streamsInitializeRequestManager,
         final StreamsPrepareAssignmentRequestManager 
streamsPrepareAssignmentRequestManager,
-        final StreamsAssignmentInterface streamsAssignmentInterface
+        final MembershipManager membershipManager,
+        final BackgroundEventHandler backgroundEventHandler,
+        final Metrics metrics,
+        final StreamsAssignmentInterface streamsAssignmentInterface,
+        final ConsumerMetadata metadata
     ) {
+        this.coordinatorRequestManager = coordinatorRequestManager;
+        this.logger = logContext.logger(getClass());
+        this.membershipManager = membershipManager;
+        this.streamsInitializeRequestManager = streamsInitializeRequestManager;
+        this.streamsPrepareAssignmentRequestManager = 
streamsPrepareAssignmentRequestManager;
+        this.backgroundEventHandler = backgroundEventHandler;
+        int maxPollIntervalMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+        long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+        long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+        this.heartbeatState = new 
StreamsHeartbeatRequestManager.HeartbeatState(streamsAssignmentInterface, 
membershipManager,
+            maxPollIntervalMs);
+        this.heartbeatRequestState = new 
StreamsHeartbeatRequestManager.HeartbeatRequestState(logContext, time, 0, 
retryBackoffMs,
+            retryBackoffMaxMs, maxPollIntervalMs);
+        this.pollTimer = time.timer(maxPollIntervalMs);
+        this.metricsManager = new HeartbeatMetricsManager(metrics);
         this.streamsInterface = streamsAssignmentInterface;
+        this.assignedTopicIdCache = new HashMap<>();
+        this.metadata = metadata;
     }
 
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-        return null;
+        if (!coordinatorRequestManager.coordinator().isPresent() ||
+            membershipManager.shouldSkipHeartbeat()) {
+            membershipManager.onHeartbeatRequestSkipped();
+            return NetworkClientDelegate.PollResult.EMPTY;
+        }
+        pollTimer.update(currentTimeMs);
+        if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+            logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+                "subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too 
much time processing " +
+                "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+                "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+
+            membershipManager.transitionToSendingLeaveGroup(true);
+            NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequest(currentTimeMs, true);
+
+            // We can ignore the leave response because we can join before or 
after receiving the response.
+            heartbeatRequestState.reset();
+            heartbeatState.reset();
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs,
+                Collections.singletonList(leaveHeartbeat));
+        }
+
+        boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight();
+        if (!heartbeatRequestState.canSendRequest(currentTimeMs) && 
!heartbeatNow) {
+            return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+        }
+
+        NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, false);
+        return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, 
Collections.singletonList(request));
+    }
+
+    @Override
+    public long maximumTimeToWait(long currentTimeMs) {
+        pollTimer.update(currentTimeMs);
+        if (
+            pollTimer.isExpired() ||
+                (membershipManager.shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight())
+        ) {
+            return 0L;
+        }
+        return Math.min(pollTimer.remainingMs() / 2, 
heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs,
+                                                                     final 
boolean ignoreResponse) {
+        NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(ignoreResponse);
+        heartbeatRequestState.onSendAttempt(currentTimeMs);
+        membershipManager.onHeartbeatRequestSent();
+        metricsManager.recordHeartbeatSentMs(currentTimeMs);
+        return request;
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
boolean ignoreResponse) {
+        NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
+            new 
StreamsHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+            coordinatorRequestManager.coordinator());
+        if (ignoreResponse) {
+            return logResponse(request);
+        } else {
+            return request.whenComplete((response, exception) -> {
+                long completionTimeMs = request.handler().completionTimeMs();
+                if (response != null) {
+                    
metricsManager.recordRequestLatency(response.requestLatencyMs());
+                    onResponse((StreamsHeartbeatResponse) 
response.responseBody(), completionTimeMs);
+                } else {
+                    onFailure(exception, completionTimeMs);
+                }
+            });
+        }
+    }
+
+    private NetworkClientDelegate.UnsentRequest logResponse(final 
NetworkClientDelegate.UnsentRequest request) {
+        return request.whenComplete((response, exception) -> {
+            if (response != null) {
+                
metricsManager.recordRequestLatency(response.requestLatencyMs());
+                Errors error =
+                    Errors.forCode(((StreamsHeartbeatResponse) 
response.responseBody()).data().errorCode());
+                if (error == Errors.NONE) {
+                    logger.debug("StreamsHeartbeat responded successfully: 
{}", response);
+                } else {
+                    logger.error("StreamsHeartbeat failed because of {}: {}", 
error, response);
+                }
+            } else {
+                logger.error("StreamsHeartbeat failed because of unexpected 
exception.", exception);
+            }
+        });
+    }
+
+    private void onFailure(final Throwable exception, final long 
responseTimeMs) {
+        this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
+        this.heartbeatState.reset();
+        if (exception instanceof RetriableException) {
+            String message = String.format("StreamsHeartbeatRequest failed 
because of the retriable exception. " +
+                    "Will retry in %s ms: %s",
+                heartbeatRequestState.remainingBackoffMs(responseTimeMs),
+                exception.getMessage());
+            logger.debug(message);
+        } else {
+            logger.error("StreamsHeartbeatRequest failed due to fatal error: 
{}", exception.getMessage());
+            handleFatalFailure(exception);
+        }
+    }
+
+    private void onResponse(final StreamsHeartbeatResponse response, long 
currentTimeMs) {
+        if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
+            onSuccessResponse(response, currentTimeMs);
+        } else {
+            onErrorResponse(response, currentTimeMs);
+        }
+    }
+
+    private void onSuccessResponse(final StreamsHeartbeatResponse response, 
final long currentTimeMs) {
+        final StreamsHeartbeatResponseData data = response.data();
+
+        
heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs());
+        heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
+        heartbeatRequestState.resetTimer();
+
+        if (data.shouldInitializeTopology()) {
+            streamsInitializeRequestManager.initialize();
+        }
+        if (data.shouldComputeAssignment()) {
+            streamsPrepareAssignmentRequestManager.prepareAssignment();
+        }
+        if (data.partitionsByHost() != null) {
+            streamsInterface.partitionsByHost.set(convertHostInfoMap(data));
+        }
+
+        ConsumerGroupHeartbeatResponseData cgData = new 
ConsumerGroupHeartbeatResponseData();
+        cgData.setMemberId(data.memberId());
+        cgData.setMemberEpoch(data.memberEpoch());
+        cgData.setErrorCode(data.errorCode());
+        cgData.setErrorMessage(data.errorMessage());
+        cgData.setThrottleTimeMs(data.throttleTimeMs());
+        cgData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
+
+        Errors assignmentError = response.assignmentError();
+        String assignmentErrorMessage = response.data().errorMessage();
+        if (assignmentError != Errors.NONE) {
+            logger.warn("Assignment incomplete. {}. {}", 
assignmentError.message(), assignmentErrorMessage);
+            switch (assignmentError) {
+                case STREAMS_SHUTDOWN_APPLICATION:
+                    streamsInterface.requestShutdown();
+                    break;
+                case STREAMS_INCONSISTENT_TOPOLOGY:
+                case STREAMS_MISSING_SOURCE_TOPICS:
+                case STREAMS_GROUP_UNINITIALIZED:
+                    break;
+            }
+        }
+
+        if (data.activeTasks() != null && data.standbyTasks() != null && 
data.warmupTasks() != null) {
+
+            setTargetAssignment(data);
+            setTargetAssignmentForConsumerGroup(data, cgData);
+
+        } else {
+            if (data.activeTasks() != null || data.standbyTasks() != null || 
data.warmupTasks() != null) {
+                throw new IllegalStateException("Invalid response data, task 
collections must be all null or all non-null: " + data);
+            }
+        }
+
+        membershipManager.onHeartbeatSuccess(cgData);
+    }
+
+    private void setTargetAssignmentForConsumerGroup(final 
StreamsHeartbeatResponseData data, final ConsumerGroupHeartbeatResponseData 
cgData) {
+        Map<String, TopicPartitions> tps = new HashMap<>();
+        data.activeTasks().forEach(taskId -> Stream.concat(
+                
streamsInterface.subtopologyMap().get(taskId.subtopology()).sourceTopics.stream(),
+                
streamsInterface.subtopologyMap().get(taskId.subtopology()).repartitionSourceTopics.keySet().stream()
+            )
+            .forEach(topic -> {
+                final TopicPartitions toInsert = tps.computeIfAbsent(topic, k 
-> {
+                    final Optional<Uuid> uuid = 
findTopicIdInGlobalOrLocalCache(topic);
+                    if (uuid.isPresent()) {
+                        TopicPartitions t =
+                            new TopicPartitions();
+                        t.setTopicId(uuid.get());
+                        return t;
+                    } else {
+                        return null;
+                    }
+                });
+                if (toInsert != null) {
+                    toInsert.partitions().addAll(taskId.partitions());
+                }
+            }));
+        ConsumerGroupHeartbeatResponseData.Assignment cgAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment();
+        cgAssignment.setTopicPartitions(new ArrayList<>(tps.values()));
+        cgData.setAssignment(cgAssignment);
+    }
+
+    private void setTargetAssignment(final StreamsHeartbeatResponseData data) {
+        Assignment targetAssignment = new Assignment();
+        updateTaskIdCollection(data.activeTasks(), 
targetAssignment.activeTasks);
+        updateTaskIdCollection(data.standbyTasks(), 
targetAssignment.standbyTasks);
+        updateTaskIdCollection(data.warmupTasks(), 
targetAssignment.warmupTasks);
+        streamsInterface.targetAssignment.set(targetAssignment);
+    }
+
+    private static Map<StreamsAssignmentInterface.HostInfo, 
List<TopicPartition>> convertHostInfoMap(
+        final StreamsHeartbeatResponseData data) {
+        Map<StreamsAssignmentInterface.HostInfo, List<TopicPartition>> 
partitionsByHost = new HashMap<>();
+        data.partitionsByHost().forEach(hostInfo -> {
+            List<TopicPartition> topicPartitions = 
hostInfo.partitions().stream()
+                .flatMap(partition ->
+                    partition.partitions().stream().map(partitionId -> new 
TopicPartition(partition.topic(), partitionId)))
+                .collect(Collectors.toList());
+            partitionsByHost.put(new 
StreamsAssignmentInterface.HostInfo(hostInfo.host(), hostInfo.port()), 
topicPartitions);
+        });
+        return partitionsByHost;
+    }
+
+    private void updateTaskIdCollection(
+        final List<StreamsHeartbeatResponseData.TaskId> source,
+        final Set<StreamsAssignmentInterface.TaskId> target
+    ) {
+        target.clear();
+        source.forEach(taskId -> {
+            taskId.partitions().forEach(partition -> {
+                target.add(new 
StreamsAssignmentInterface.TaskId(taskId.subtopology(), partition));
+            });
+        });
+    }
+
+    private void onErrorResponse(final StreamsHeartbeatResponse response,
+                                 final long currentTimeMs) {
+        Errors error = Errors.forCode(response.data().errorCode());
+        String errorMessage = response.data().errorMessage();
+        String message;
+
+        this.heartbeatState.reset();
+        this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
+        membershipManager.onHeartbeatFailure();
+
+        switch (error) {
+            case NOT_COORDINATOR:
+                // the manager should retry immediately when the coordinator 
node becomes available again
+                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is incorrect. " +
+                        "Will attempt to find the coordinator again and retry",
+                    coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
+                break;
+
+            case COORDINATOR_NOT_AVAILABLE:
+                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is not available. " +
+                        "Will attempt to find the coordinator again and retry",
+                    coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                coordinatorRequestManager.markCoordinatorUnknown(errorMessage, 
currentTimeMs);
+                // Skip backoff so that the next HB is sent as soon as the new 
coordinator is discovered
+                heartbeatRequestState.reset();
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // the manager will backoff and retry
+                message = String.format("StreamsHeartbeatRequest failed 
because the group coordinator %s is still loading." +
+                        "Will retry",
+                    coordinatorRequestManager.coordinator());
+                logInfo(message, response, currentTimeMs);
+                break;
+
+            case GROUP_AUTHORIZATION_FAILED:
+                GroupAuthorizationException exception =
+                    
GroupAuthorizationException.forGroupId(membershipManager.groupId());
+                logger.error("StreamsHeartbeatRequest failed due to group 
authorization failure: {}", exception.getMessage());
+                handleFatalFailure(error.exception(exception.getMessage()));
+                break;
+
+            case UNRELEASED_INSTANCE_ID:
+                logger.error("StreamsHeartbeatRequest failed due to the 
instance id {} was not released: {}",
+                    membershipManager.groupInstanceId().orElse("null"), 
errorMessage);
+                
handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
+                break;
+
+            case INVALID_REQUEST:
+            case GROUP_MAX_SIZE_REACHED:
+            case UNSUPPORTED_ASSIGNOR:
+            case UNSUPPORTED_VERSION:
+                logger.error("StreamsHeartbeatRequest failed due to error: 
{}", error);
+                handleFatalFailure(error.exception(errorMessage));
+                break;
+
+            case FENCED_MEMBER_EPOCH:
+                message = String.format("StreamsHeartbeatRequest failed for 
member %s because epoch %s is fenced.",
+                    membershipManager.memberId(), 
membershipManager.memberEpoch());
+                logInfo(message, response, currentTimeMs);
+                membershipManager.transitionToFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
+                break;
+
+            case UNKNOWN_MEMBER_ID:
+                message = String.format("StreamsHeartbeatRequest failed 
because member %s is unknown.",
+                    membershipManager.memberId());
+                logInfo(message, response, currentTimeMs);
+                membershipManager.transitionToFenced();
+                // Skip backoff so that a next HB to rejoin is sent as soon as 
the fenced member releases its assignment
+                heartbeatRequestState.reset();
+                break;
+
+            default:
+                // If the manager receives an unknown error - there could be a 
bug in the code or a new error code
+                logger.error("StreamsHeartbeatRequest failed due to unexpected 
error: {}", error);
+                handleFatalFailure(error.exception(errorMessage));
+                break;
+        }
+    }
+
+    private void logInfo(final String message,
+                         final StreamsHeartbeatResponse response,
+                         final long currentTimeMs) {
+        logger.info("{} in {}ms: {}",
+            message,
+            heartbeatRequestState.remainingBackoffMs(currentTimeMs),
+            response.data().errorMessage());
+    }
+
+    private void handleFatalFailure(Throwable error) {
+        backgroundEventHandler.add(new ErrorEvent(error));
+        membershipManager.transitionToFatal();
+    }
+
+    /**
+     * Represents the state of a heartbeat request, including logic for 
timing, retries, and exponential backoff. The object extends
+     * {@link RequestState} to enable exponential backoff and duplicated 
request handling. The two fields that it holds are:
+     */
+    static class HeartbeatRequestState extends RequestState {
+
+        /**
+         * heartbeatTimer tracks the time since the last heartbeat was sent
+         */
+        private final Timer heartbeatTimer;
+
+        /**
+         * The heartbeat interval which is acquired/updated through the 
heartbeat request
+         */
+        private long heartbeatIntervalMs;
+
+        public HeartbeatRequestState(
+            final LogContext logContext,
+            final Time time,
+            final long heartbeatIntervalMs,
+            final long retryBackoffMs,
+            final long retryBackoffMaxMs,
+            final double jitter) {
+            super(logContext, 
StreamsHeartbeatRequestManager.HeartbeatRequestState.class.getName(), 
retryBackoffMs, 2, retryBackoffMaxMs,
+                jitter);
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+            this.heartbeatTimer = time.timer(heartbeatIntervalMs);
+        }
+
+        private void update(final long currentTimeMs) {
+            this.heartbeatTimer.update(currentTimeMs);
+        }
+
+        public void resetTimer() {
+            this.heartbeatTimer.reset(heartbeatIntervalMs);
+        }
+
+        @Override
+        public boolean canSendRequest(final long currentTimeMs) {
+            update(currentTimeMs);
+            return heartbeatTimer.isExpired() && 
super.canSendRequest(currentTimeMs);
+        }
+
+        public long nextHeartbeatMs(final long currentTimeMs) {
+            if (heartbeatTimer.remainingMs() == 0) {
+                return this.remainingBackoffMs(currentTimeMs);
+            }
+            return heartbeatTimer.remainingMs();
+        }
+
+        private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) 
{
+            if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
+                // no need to update the timer if the interval hasn't changed
+                return;
+            }
+            this.heartbeatIntervalMs = heartbeatIntervalMs;
+            this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
+        }
+    }
+
+    private Optional<Uuid> findTopicIdInGlobalOrLocalCache(String topicName) {
+        Uuid idFromMetadataCache = metadata.topicIds().getOrDefault(topicName, 
null);
+        if (idFromMetadataCache != null) {
+            // Add topic name to local cache, so it can be reused if included 
in a next target
+            // assignment if metadata cache not available.
+            assignedTopicIdCache.put(topicName, idFromMetadataCache);
+            return Optional.of(idFromMetadataCache);
+        } else {
+            Uuid idFromLocalCache = 
assignedTopicIdCache.getOrDefault(topicName, null);
+            return Optional.ofNullable(idFromLocalCache);
+        }
+    }
+
+    static class HeartbeatState {
+
+        private final MembershipManager membershipManager;
+        private final int rebalanceTimeoutMs;
+        private final StreamsHeartbeatRequestManager.HeartbeatState.SentFields 
sentFields;
+
+        /*
+         * StreamsGroupMetadata holds the metadata for the streams group
+         */
+        private final StreamsAssignmentInterface streamsInterface;
+
+        public HeartbeatState(
+            final StreamsAssignmentInterface streamsInterface,
+            final MembershipManager membershipManager,
+            final int rebalanceTimeoutMs) {
+            this.membershipManager = membershipManager;
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            this.sentFields = new 
StreamsHeartbeatRequestManager.HeartbeatState.SentFields();
+            this.streamsInterface = streamsInterface;
+        }
+
+        public void reset() {
+            sentFields.reset();
+        }
+
+        public StreamsHeartbeatRequestData buildRequestData() {
+            StreamsHeartbeatRequestData data = new 
StreamsHeartbeatRequestData();
+
+            // GroupId - always sent
+            data.setGroupId(membershipManager.groupId());
+
+            // MemberId - always sent, empty until it has been received from 
the coordinator
+            data.setMemberId(membershipManager.memberId());
+
+            // MemberEpoch - always sent
+            data.setMemberEpoch(membershipManager.memberEpoch());
+
+            // InstanceId - set if present
+            membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
+
+            boolean joining = membershipManager.state() == MemberState.JOINING;
+
+            // RebalanceTimeoutMs - only sent when joining or if it has 
changed since the last heartbeat
+            if (joining || sentFields.rebalanceTimeoutMs != 
rebalanceTimeoutMs) {
+                data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
+                sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            }
+
+            // Immutable -- only sent when joining
+            if (joining) {
+                data.setProcessId(streamsInterface.processID().toString());
+                HostInfo hostInfo = new HostInfo();
+                hostInfo.setHost(streamsInterface.endPoint().host);
+                hostInfo.setPort(streamsInterface.endPoint().port);
+                data.setHostInfo(hostInfo);
+                data.setAssignor(streamsInterface.assignor());
+                
data.setClientTags(streamsInterface.clientTags().entrySet().stream().map(entry 
-> {
+                    StreamsHeartbeatRequestData.KeyValue tag = new 
StreamsHeartbeatRequestData.KeyValue();
+                    tag.setKey(entry.getKey());
+                    tag.setValue(entry.getValue());
+                    return tag;
+                }).collect(Collectors.toList()));
+                
data.setAssignmentConfigs(streamsInterface.assignmentConfiguration().entrySet().stream().map(entry
 -> {
+                    StreamsHeartbeatRequestData.KeyValue config = new 
StreamsHeartbeatRequestData.KeyValue();
+                    config.setKey(entry.getKey());
+                    config.setValue(entry.getValue().toString());
+                    return config;
+                }).collect(Collectors.toList()));
+            } else {
+                // FIXME: This seems to be a bug in the RPC serialization 
code. The default should be null, but it's empty.
+                data.setAssignor(null);
+            }
+
+            if (streamsInterface.shutdownRequested()) {
+                data.setShutdownApplication(true);
+            }
+
+            Assignment reconciledAssignment = 
streamsInterface.reconciledAssignment.get();
+
+            if (reconciledAssignment != null) {
+                if (!reconciledAssignment.equals(sentFields.assignment)) {
+                    
data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks));
+                    
data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks));
+                    
data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks));
+                    sentFields.assignment = reconciledAssignment;
+                }
+            }
+
+            return data;
+        }
+
+        private List<TaskId> convertTaskIdCollection(final 
Set<StreamsAssignmentInterface.TaskId> tasks) {
+            return tasks.stream()
+                .collect(
+                    
Collectors.groupingBy(StreamsAssignmentInterface.TaskId::subtopologyId,
+                        
Collectors.mapping(StreamsAssignmentInterface.TaskId::partitionId, 
Collectors.toList()))
+                )
+                .entrySet()
+                .stream()
+                .map(entry -> {
+                    TaskId id = new TaskId();
+                    id.setSubtopology(entry.getKey());
+                    id.setPartitions(entry.getValue());
+                    return id;
+                })
+                .collect(Collectors.toList());
+        }
+
+        // Fields of StreamsHeartbeatRequest sent in the most recent request
+        static class SentFields {
+
+            private int rebalanceTimeoutMs = -1;
+            private Assignment assignment = null;
+
+            SentFields() {
+            }
+
+            void reset() {
+                rebalanceTimeoutMs = -1;
+                assignment = null;
+            }
+        }
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
index 7936e885b56..b47572637bb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java
@@ -28,7 +28,7 @@ public class StreamsInstallAssignmentRequestManager 
implements RequestManager {
 
     @Override
     public PollResult poll(final long currentTimeMs) {
-        return null;
+        return PollResult.EMPTY;
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
index 069661f4893..fb4f59cdf96 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java
@@ -28,7 +28,7 @@ public class StreamsPrepareAssignmentRequestManager 
implements RequestManager {
 
     @Override
     public PollResult poll(final long currentTimeMs) {
-        return null;
+        return PollResult.EMPTY;
     }
 
     public void prepareAssignment() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java
similarity index 86%
rename from 
clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
rename to 
clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java
index ffd0b63d8e3..204ba1e2186 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.common.errors;
 
-public class StreamsMissingInternalTopicsException extends ApiException {
-    public StreamsMissingInternalTopicsException(String message) {
+public class StreamsGroupUninitializedException extends ApiException {
+    public StreamsGroupUninitializedException(String message) {
         super(message);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java
similarity index 86%
rename from 
clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
rename to 
clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java
index dd6509f90f0..8fae67b6b4b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.common.errors;
 
-public class StreamsInvalidAssignment extends ApiException {
-    public StreamsInvalidAssignment(String message) {
+public class StreamsInvalidAssignmentException extends ApiException {
+    public StreamsInvalidAssignmentException(String message) {
         super(message);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 9790d87863a..f9f063051b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.StreamsInconsistentTopologyException;
-import org.apache.kafka.common.errors.StreamsInvalidAssignment;
+import org.apache.kafka.common.errors.StreamsInvalidAssignmentException;
 import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
-import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsGroupUninitializedException;
 import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
 import org.apache.kafka.common.errors.StreamsShutdownApplicationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -428,8 +428,10 @@ public enum Errors {
         StreamsMissingSourceTopicsException::new),
     STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are 
missing.",
         StreamsMissingInternalTopicsException::new),
-    STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the 
whole application.",
-        StreamsShutdownApplicationException::new);
+    STREAMS_GROUP_UNINITIALIZED(134, "The group is not (fully) initialized, 
broker-side topology information or internal topics are missing.",
+            StreamsGroupUninitializedException::new),
+    STREAMS_SHUTDOWN_APPLICATION(135, "A client requested the shutdown of the 
whole application.",
+            StreamsShutdownApplicationException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
index 1e8c7a5b7e9..835fec5c8ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
@@ -41,7 +41,7 @@ import java.util.Map;
  * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION}
  * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY}
  * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS}
- * - {@link Errors#STREAMS_MISSING_INTERNAL_TOPICS}
+ * - {@link Errors#STREAMS_GROUP_UNINITIALIZED}
  */
 public class StreamsHeartbeatResponse extends AbstractResponse {
     private final StreamsHeartbeatResponseData data;
@@ -71,6 +71,18 @@ public class StreamsHeartbeatResponse extends 
AbstractResponse {
         data.setThrottleTimeMs(throttleTimeMs);
     }
 
+    /**
+     * Possible error codes.
+     *
+     * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION}
+     * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY}
+     * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS}
+     * - {@link Errors#STREAMS_GROUP_UNINITIALIZED}
+     */
+    public Errors assignmentError() {
+        return Errors.forCode(data.assignmentErrorCode());
+    }
+
     public static StreamsHeartbeatResponse parse(ByteBuffer buffer, short 
version) {
         return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData(
             new ByteBufferAccessor(buffer), version));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
index 04072dd8565..83761d805de 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
@@ -1,9 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.StreamsInitializeRequestData;
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
index 7b7d19961de..81c281aba98 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.StreamsInitializeResponseData;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
index d793101c537..7063ec61676 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java
@@ -1,10 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
 import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
-import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
index 3a09420f796..cc389760577 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
-import org.apache.kafka.common.message.StreamsInitializeResponseData;
 import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
index 3c83273bf4f..adeef79393b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java
@@ -1,10 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
 import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
-import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
index 99997022783..f0c478ec81b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java
@@ -1,10 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
-import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
 import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
index c97d0b912ff..cd8df8c477e 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -34,11 +34,11 @@
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
       "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
 
-    // If the topology hash does not match the server's topology hash, return 
INVALID_TOPOLOGY and send ShouldInitializeTopology to one consumer
     { "name": "TopologyHash", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": null,
       "about": "The hash of the topology. Only sent when memberEpoch = 0. Null 
otherwise. Used to check if topology corresponds to server-topology. " },
+    { "name": "Assignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": null,
+      "about": "The desired assignor. Picked by majority vote among clients. 
If null, the client votes for the broker to pick the assignor." },
 
-    // The coordinator knows which partitions to fetch from given this 
information
     { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
     { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
@@ -80,8 +80,6 @@
   ],
   "commonStructs": [
     { "name": "TaskId", "versions": "0+", "fields": [
-      // We are using strings here, to avoid fixing a structure on task IDs 
which will
-      // allow introducing stable naming for subtopologies in the future.
       { "name": "Subtopology", "type": "string", "versions": "0+",
         "about": "subtopology ID" },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
index 272a5a4cd51..513e1d78911 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
@@ -30,10 +30,6 @@
   // - UNSUPPORTED_ASSIGNOR (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
-  // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+)
-  // - STREAMS_MISSING_SOURCE_TOPICS (version 0+)
-  // - STREAMS_MISSING_INTERNAL_TOPICS (version 0+)
-  // - STREAMS_SHUTDOWN_APPLICATION (version 0+)
   "fields": [
     // Same as consumer group heart beat
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
@@ -55,6 +51,16 @@
     { "name": "ShouldInitializeTopology", "type":  "bool", "versions":  "0+", 
"default": false,
       "about": "true if this streams application should initialize the 
topology on the broker" },
 
+    // Possible errors:
+    // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+)
+    // - STREAMS_MISSING_SOURCE_TOPICS (version 0+)
+    // - STREAMS_SHUTDOWN_APPLICATION (version 0+)
+    // - STREAMS_GROUP_UNITIALIZED (version 0+)
+    { "name": "AssignmentErrorCode", "type":  "int16", "versions":  "0+", 
"default": 0,
+      "about": "An error code relating to the current state of the assignment 
to this member." },
+    { "name": "AssignmentErrorMessage", "type":  "string", "versions":  "0+", 
"default": "",
+      "about": "An error message relating to the current state of the 
assignment to this member." },
+
     // The streams app knows which partitions to fetch from given this 
information
     { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Local assignment for this consumer. Null if unchanged since 
last heartbeat." },
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
new file mode 100644
index 00000000000..17b99c9c72f
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.HostInfo;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Subtopology;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.TaskId;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.TopicInfo;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.StreamsHeartbeatRequest;
+import org.apache.kafka.common.requests.StreamsHeartbeatResponse;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class StreamsHeartbeatRequestManagerTest {
+
+    public static final String TEST_GROUP_ID = "testGroupId";
+    public static final String TEST_MEMBER_ID = "testMemberId";
+    public static final int TEST_MEMBER_EPOCH = 5;
+    public static final String TEST_INSTANCE_ID = "instanceId";
+    public static final int TEST_THROTTLE_TIME_MS = 5;
+    private StreamsHeartbeatRequestManager heartbeatRequestManager;
+
+    private Time time;
+
+    private StreamsAssignmentInterface streamsAssignmentInterface;
+
+    private ConsumerConfig config;
+
+    @Mock
+    private CoordinatorRequestManager coordinatorRequestManager;
+
+    @Mock
+    private StreamsInitializeRequestManager streamsInitializeRequestManager;
+
+    @Mock
+    private StreamsPrepareAssignmentRequestManager 
streamsPrepareAssignmentRequestManager;
+
+    @Mock
+    private MembershipManager membershipManager;
+
+    @Mock
+    private BackgroundEventHandler backgroundEventHandler;
+
+    @Mock
+    private Metrics metrics;
+
+    @Mock
+    private ConsumerMetadata metadata;
+
+    // Static data for testing
+    private final UUID processID = new UUID(1, 1);
+
+    private final HostInfo endPoint = new HostInfo("localhost", 9092);
+
+    private final String assignor = "test";
+
+    private final Map<String, Subtopology> subtopologyMap = new HashMap<>();
+
+    private final Map<String, Object> assignmentConfiguration = new 
HashMap<>();
+
+    private final Map<String, String> clientTags = new HashMap<>();
+
+    private final Node coordinatorNode = new Node(1, "localhost", 9092);
+
+    @BeforeEach
+    void setUp() {
+        config = config();
+
+        subtopologyMap.clear();
+        assignmentConfiguration.clear();
+        clientTags.clear();
+        streamsAssignmentInterface =
+            new StreamsAssignmentInterface(
+                processID,
+                endPoint,
+                assignor,
+                subtopologyMap,
+                assignmentConfiguration,
+                clientTags
+            );
+        LogContext logContext = new LogContext("test");
+        time = new MockTime();
+
+        MockitoAnnotations.openMocks(this);
+        when(metrics.sensor(anyString())).thenReturn(mock(Sensor.class));
+        heartbeatRequestManager = new StreamsHeartbeatRequestManager(
+            logContext,
+            time,
+            config,
+            coordinatorRequestManager,
+            streamsInitializeRequestManager,
+            streamsPrepareAssignmentRequestManager,
+            membershipManager,
+            backgroundEventHandler,
+            metrics,
+            streamsAssignmentInterface,
+            metadata
+        );
+
+        when(membershipManager.groupId()).thenReturn(TEST_GROUP_ID);
+        when(membershipManager.memberId()).thenReturn(TEST_MEMBER_ID);
+        when(membershipManager.memberEpoch()).thenReturn(TEST_MEMBER_EPOCH);
+        
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(TEST_INSTANCE_ID));
+    }
+
+
+    @Test
+    void testNoHeartbeatIfCoordinatorUnknown() {
+        when(membershipManager.shouldHeartbeatNow()).thenReturn(true);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(0, result.unsentRequests.size());
+        verify(membershipManager).onHeartbeatRequestSkipped();
+    }
+
+    @Test
+    void testNoHeartbeatIfHeartbeatSkipped() {
+        when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(0, result.unsentRequests.size());
+        verify(membershipManager).onHeartbeatRequestSkipped();
+    }
+
+    @Test
+    void testHeartbeatWhenCoordinatorKnown() {
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
+
+        StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) 
result.unsentRequests.get(0).requestBuilder().build();
+
+        assertEquals(TEST_GROUP_ID, request.data().groupId());
+        assertEquals(TEST_MEMBER_ID, request.data().memberId());
+        assertEquals(TEST_MEMBER_EPOCH, request.data().memberEpoch());
+        assertEquals(TEST_INSTANCE_ID, request.data().instanceId());
+
+        // Static information is null
+        assertNull(request.data().processId());
+        assertNull(request.data().hostInfo());
+        assertNull(request.data().assignor());
+        assertNull(request.data().assignmentConfigs());
+        assertNull(request.data().clientTags());
+    }
+
+    @Test
+    void testFullStaticInformationWhenJoining() {
+        mockJoiningState();
+        assignmentConfiguration.put("config1", "value1");
+        clientTags.put("clientTag1", "value2");
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
+
+        StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) 
result.unsentRequests.get(0).requestBuilder().build();
+
+        assertEquals(processID.toString(), request.data().processId());
+        assertEquals(endPoint.host, request.data().hostInfo().host());
+        assertEquals(endPoint.port, request.data().hostInfo().port());
+        assertEquals(assignor, request.data().assignor());
+        assertEquals(1, request.data().assignmentConfigs().size());
+        assertEquals("config1", 
request.data().assignmentConfigs().get(0).key());
+        assertEquals("value1", 
request.data().assignmentConfigs().get(0).value());
+        assertEquals(1, request.data().clientTags().size());
+        assertEquals("clientTag1", request.data().clientTags().get(0).key());
+        assertEquals("value2", request.data().clientTags().get(0).value());
+    }
+
+    @Test
+    void testShutdownRequested() {
+        mockJoiningState();
+        streamsAssignmentInterface.requestShutdown();
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        assertEquals(Optional.of(coordinatorNode), 
result.unsentRequests.get(0).node());
+
+        StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) 
result.unsentRequests.get(0).requestBuilder().build();
+
+        assertEquals(true, request.data().shutdownApplication());
+    }
+
+    @Test
+    void testSuccessfulResponse() {
+        mockJoiningState();
+
+        final Uuid uuid0 = Uuid.randomUuid();
+        final Uuid uuid1 = Uuid.randomUuid();
+
+        final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), 
Collections.emptyMap());
+
+        when(metadata.topicIds()).thenReturn(
+            mkMap(
+                mkEntry("source0", uuid0),
+                mkEntry("repartition0", uuid1)
+            ));
+
+        streamsAssignmentInterface.subtopologyMap().put("0",
+            new Subtopology(
+                Collections.singleton("source0"),
+                Collections.singleton("sink0"),
+                Collections.singletonMap("repartition0", emptyTopicInfo),
+                Collections.singletonMap("changelog0", emptyTopicInfo)
+            ));
+        streamsAssignmentInterface.subtopologyMap().put("1",
+            new Subtopology(
+                Collections.singleton("source1"),
+                Collections.singleton("sink1"),
+                Collections.singletonMap("repartition1", emptyTopicInfo),
+                Collections.singletonMap("changelog1", emptyTopicInfo)
+            ));
+        streamsAssignmentInterface.subtopologyMap().put("2",
+            new Subtopology(
+                Collections.singleton("source2"),
+                Collections.singleton("sink2"),
+                Collections.singletonMap("repartition2", emptyTopicInfo),
+                Collections.singletonMap("changelog2", emptyTopicInfo)
+            ));
+
+        StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(0)
+            .setMemberId(TEST_MEMBER_ID)
+            .setMemberEpoch(TEST_MEMBER_EPOCH)
+            .setThrottleTimeMs(TEST_THROTTLE_TIME_MS)
+            .setHeartbeatIntervalMs(1000)
+            .setActiveTasks(Collections.singletonList(
+                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("0").setPartitions(Collections.singletonList(0))))
+            .setStandbyTasks(Collections.singletonList(
+                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("1").setPartitions(Collections.singletonList(1))))
+            .setWarmupTasks(Collections.singletonList(
+                new 
StreamsHeartbeatResponseData.TaskId().setSubtopology("2").setPartitions(Collections.singletonList(2))));
+
+        mockResponse(data);
+
+        ArgumentCaptor<ConsumerGroupHeartbeatResponseData> captor = 
ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponseData.class);
+        verify(membershipManager, 
times(1)).onHeartbeatSuccess(captor.capture());
+        ConsumerGroupHeartbeatResponseData response = captor.getValue();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(TEST_MEMBER_ID, response.memberId());
+        assertEquals(TEST_MEMBER_EPOCH, response.memberEpoch());
+        assertEquals(TEST_THROTTLE_TIME_MS, response.throttleTimeMs());
+        assertEquals(1000, response.heartbeatIntervalMs());
+        final List<TopicPartitions> tps = 
response.assignment().topicPartitions();
+        assertEquals(2, tps.size());
+        assertEquals(mkSet(uuid0, uuid1), 
tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet()));
+        assertEquals(Collections.singletonList(0), tps.get(0).partitions());
+        assertEquals(Collections.singletonList(0), tps.get(1).partitions());
+
+        final Assignment targetAssignment = 
streamsAssignmentInterface.targetAssignment.get();
+        assertEquals(1, targetAssignment.activeTasks.size());
+        final TaskId activeTaskId = 
targetAssignment.activeTasks.stream().findFirst().get();
+        assertEquals(activeTaskId.subtopologyId, "0");
+        assertEquals(activeTaskId.partitionId, 0);
+
+        assertEquals(1, targetAssignment.standbyTasks.size());
+        final TaskId standbyTaskId = 
targetAssignment.standbyTasks.stream().findFirst().get();
+        assertEquals(standbyTaskId.subtopologyId, "1");
+        assertEquals(standbyTaskId.partitionId, 1);
+
+        assertEquals(1, targetAssignment.warmupTasks.size());
+        final TaskId warmupTaskId = 
targetAssignment.warmupTasks.stream().findFirst().get();
+        assertEquals(warmupTaskId.subtopologyId, "2");
+        assertEquals(warmupTaskId.partitionId, 2);
+
+    }
+
+
+    @Test
+    void testPrepareAssignment() {
+        mockJoiningState();
+
+        StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(0)
+            .setMemberEpoch(TEST_MEMBER_EPOCH)
+            .setShouldComputeAssignment(true);
+
+        mockResponse(data);
+
+        verify(streamsPrepareAssignmentRequestManager).prepareAssignment();
+    }
+
+    @Test
+    void testInitializeTopology() {
+        mockJoiningState();
+
+        StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(0)
+            .setMemberEpoch(TEST_MEMBER_EPOCH)
+            .setShouldInitializeTopology(true);
+
+        mockResponse(data);
+
+        verify(streamsInitializeRequestManager).initialize();
+    }
+
+    private void mockResponse(final StreamsHeartbeatResponseData data) {
+
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+
+        assertEquals(1, result.unsentRequests.size());
+        final UnsentRequest unsentRequest = result.unsentRequests.get(0);
+        assertEquals(Optional.of(coordinatorNode), unsentRequest.node());
+
+        ClientResponse response = createHeartbeatResponse(unsentRequest, data);
+
+        unsentRequest.handler().onComplete(response);
+    }
+
+    private void mockJoiningState() {
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode));
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+    }
+
+    private ClientResponse createHeartbeatResponse(
+        final NetworkClientDelegate.UnsentRequest request,
+        final StreamsHeartbeatResponseData data
+    ) {
+        StreamsHeartbeatResponse response = new StreamsHeartbeatResponse(data);
+        return new ClientResponse(
+            new RequestHeader(ApiKeys.STREAMS_HEARTBEAT, 
ApiKeys.STREAMS_HEARTBEAT.latestVersion(), "client-id", 1),
+            request.handler(),
+            "0",
+            time.milliseconds(),
+            time.milliseconds(),
+            false,
+            null,
+            null,
+            response);
+    }
+
+    private ConsumerConfig config() {
+        Properties prop = new Properties();
+        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
+        return new ConsumerConfig(prop);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 71ac9982d67..1ec2053b3ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -49,9 +49,9 @@ public class DefaultKafkaClientSupplier implements 
KafkaClientSupplier {
     }
 
     @Override
-    public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final 
Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) {
-        ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer();
-        ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer();
+    public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final 
Map<String, Object> config, final StreamsAssignmentInterface 
assignmentInterface) {
+        final ByteArrayDeserializer keyDeserializer = new 
ByteArrayDeserializer();
+        final ByteArrayDeserializer valueDeserializer = new 
ByteArrayDeserializer();
         return new AsyncKafkaConsumer<>(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(config, 
keyDeserializer, valueDeserializer)),
             keyDeserializer, valueDeserializer, 
Optional.of(assignmentInterface));
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7433896fab8..f0a27b913f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -27,11 +27,14 @@ import 
org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface;
-import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.SubTopology;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment;
+import 
org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Subtopology;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -57,7 +60,6 @@ import 
org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
-import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -67,6 +69,7 @@ import 
org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.Def
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
+import java.util.List;
 import org.slf4j.Logger;
 
 import java.time.Duration;
@@ -85,12 +88,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId;
 
+@SuppressWarnings("ClassDataAbstractionCoupling")
 public class StreamThread extends Thread implements ProcessingThread {
 
     private static final String THREAD_ID_SUBSTRING = "-StreamThread-";
@@ -347,6 +352,10 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     // handler for, eg MissingSourceTopicException with named topologies
     private final Queue<StreamsException> nonFatalExceptionsToHandle;
 
+    // These are used only with the Streams Rebalance Protocol client
+    private final StreamsAssignmentInterface streamsAssignmentInterface;
+    private final StreamsMetadataState streamsMetadataState;
+
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
@@ -483,6 +492,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
 
         final Consumer<byte[], byte[]> mainConsumer;
+        final StreamsAssignmentInterface streamsAssignmentInterface;
         if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) 
&&
             
consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name))
 {
             if (topologyMetadata.hasNamedTopologies()) {
@@ -490,7 +500,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             }
             log.info("Streams rebalance protocol enabled");
 
-            StreamsAssignmentInterface streamsAssignmentInterface =
+            streamsAssignmentInterface =
                 initAssignmentInterface(processId,
                 config,
                 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
@@ -499,6 +509,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             mainConsumer = 
clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, 
streamsAssignmentInterface);
         } else {
             mainConsumer = clientSupplier.getConsumer(consumerConfigs);
+            streamsAssignmentInterface = null;
         }
 
         taskManager.setMainConsumer(mainConsumer);
@@ -526,8 +537,9 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             referenceContainer.nonFatalExceptionsToHandle,
             shutdownErrorHook,
             streamsUncaughtExceptionHandler,
-            cache::resize
-        );
+            cache::resize,
+            streamsAssignmentInterface,
+            referenceContainer.streamsMetadataState);
 
         return streamThread.updateThreadMetadata(adminClientId(clientId));
     }
@@ -541,26 +553,27 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
     }
 
-    private static StreamsAssignmentInterface initAssignmentInterface(UUID 
processId,
-                                                                      
StreamsConfig config,
-                                                                      HostInfo 
hostInfo,
+    private static StreamsAssignmentInterface initAssignmentInterface(final 
UUID processId,
+                                                                      final 
StreamsConfig config,
+                                                                      final 
HostInfo hostInfo,
                                                                       final 
TopologyMetadata topologyMetadata) {
         final InternalTopologyBuilder internalTopologyBuilder = 
topologyMetadata.lookupBuilderForNamedTopology(null);
 
-        Map<String, SubTopology> subtopologyMap = new HashMap<>();
-        for (Map.Entry<Subtopology, TopicsInfo> topicsInfoEntry: 
internalTopologyBuilder.subtopologyToTopicsInfo().entrySet()) {
+        final Map<String, Subtopology> subtopologyMap = new HashMap<>();
+        for (final Map.Entry<TopologyMetadata.Subtopology, TopicsInfo> 
topicsInfoEntry : internalTopologyBuilder.subtopologyToTopicsInfo()
+            .entrySet()) {
             subtopologyMap.put(
                 String.valueOf(topicsInfoEntry.getKey().nodeGroupId),
-                new SubTopology(
+                new Subtopology(
                     topicsInfoEntry.getValue().sourceTopics,
                     topicsInfoEntry.getValue().sinkTopics,
                     
topicsInfoEntry.getValue().repartitionSourceTopics.entrySet()
                         .stream()
-                        .collect(Collectors.toMap(Map.Entry::getKey , e->
+                        .collect(Collectors.toMap(Map.Entry::getKey, e ->
                             new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs))),
                     topicsInfoEntry.getValue().stateChangelogTopics.entrySet()
                         .stream()
-                        .collect(Collectors.toMap(Map.Entry::getKey , e->
+                        .collect(Collectors.toMap(Map.Entry::getKey, e ->
                             new 
StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), 
e.getValue().topicConfigs)))
 
                 )
@@ -569,22 +582,28 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
         // TODO: Which of these are actually needed?
         // TODO: Maybe we want to split this into assignment properties and 
internal topic configuration properties
-        HashMap<String, Object> assignmentProperties = new HashMap<>();
+        final HashMap<String, Object> assignmentProperties = new HashMap<>();
         assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG));
         assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, 
config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG));
         assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
-        assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG,
+            config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG));
         assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 
config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
 config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, 
config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, 
config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
-        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
 config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+        
assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
+            
config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+            
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+            
config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+            config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
+        
assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+            
config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
 
         return new StreamsAssignmentInterface(
             processId,
-            hostInfo.host(),
-            hostInfo.port(),
+            new StreamsAssignmentInterface.HostInfo(hostInfo.host(), 
hostInfo.port()),
+            null,
             subtopologyMap,
             assignmentProperties,
             config.getClientTags()
@@ -662,8 +681,9 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                         final Queue<StreamsException> 
nonFatalExceptionsToHandle,
                         final Runnable shutdownErrorHook,
                         final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler,
-                        final java.util.function.Consumer<Long> cacheResizer
-                        ) {
+                        final java.util.function.Consumer<Long> cacheResizer,
+                        final StreamsAssignmentInterface 
streamsAssignmentInterface,
+                        final StreamsMetadataState streamsMetadataState) {
         super(threadId);
         this.stateLock = new Object();
         this.adminClient = adminClient;
@@ -684,6 +704,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
         this.cacheResizer = cacheResizer;
+        this.streamsAssignmentInterface = streamsAssignmentInterface;
+        this.streamsMetadataState = streamsMetadataState;
 
         // The following sensors are created here but their references are not 
stored in this object, since within
         // this object they are not recorded. The sensors are created here so 
that the stream threads starts with all
@@ -1350,6 +1372,86 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return pollLatency;
     }
 
+    public void maybeHandleAssignmentFromStreamsRebalanceProtocol() {
+        if (streamsAssignmentInterface != null) {
+
+            if (streamsAssignmentInterface.shutdownRequested()) {
+                
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
+            }
+
+            // Process metadata from Streams Rebalance Protocol
+            final Map<StreamsAssignmentInterface.HostInfo, 
List<TopicPartition>> hostInfoListMap = 
streamsAssignmentInterface.partitionsByHost.get();
+            final Map<HostInfo, Set<TopicPartition>> convertedHostInfoMap = 
new HashMap<>();
+            hostInfoListMap.forEach((hostInfo, topicPartitions) ->
+                convertedHostInfoMap.put(new HostInfo(hostInfo.host, 
hostInfo.port), new HashSet<>(topicPartitions)));
+            streamsMetadataState.onChange(
+                convertedHostInfoMap,
+                Collections.emptyMap(), // TODO: We cannot differentiate 
between standby and active tasks here?!
+                getTopicPartitionInfo(convertedHostInfoMap)
+            );
+
+            // Process assignment from Streams Rebalance Protocol
+            final Assignment newAssignment = 
streamsAssignmentInterface.targetAssignment.get();
+            if 
(!newAssignment.equals(streamsAssignmentInterface.reconciledAssignment.get())) {
+
+                final Map<TaskId, Set<TopicPartition>> 
activeTasksWithPartitions =
+                    
pairWithTopicPartitions(newAssignment.activeTasks.stream());
+                final Map<TaskId, Set<TopicPartition>> 
standbyTasksWithPartitions =
+                    
pairWithTopicPartitions(Stream.concat(newAssignment.standbyTasks.stream(), 
newAssignment.warmupTasks.stream()));
+
+                log.info("Processing new assignment {} from Streams Rebalance 
Protocol", newAssignment);
+
+                taskManager.handleAssignment(
+                    activeTasksWithPartitions,
+                    standbyTasksWithPartitions
+                );
+            }
+        }
+    }
+
+    static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final 
Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
+        final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new 
HashMap<>();
+        for (final Set<TopicPartition> value : partitionsByHost.values()) {
+            for (final TopicPartition topicPartition : value) {
+                topicToPartitionInfo.put(
+                    topicPartition,
+                    new PartitionInfo(
+                        topicPartition.topic(),
+                        topicPartition.partition(),
+                        null,
+                        new Node[0],
+                        new Node[0]
+                    )
+                );
+            }
+        }
+        return topicToPartitionInfo;
+    }
+
+
+    private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(final 
Stream<StreamsAssignmentInterface.TaskId> taskIdStream) {
+        return taskIdStream
+            .collect(Collectors.toMap(
+                this::toTaskId,
+                task -> toTopicPartitions(task, 
streamsAssignmentInterface.subtopologyMap().get(task.subtopologyId))
+            ));
+    }
+
+    private TaskId toTaskId(final StreamsAssignmentInterface.TaskId task) {
+        return new TaskId(Integer.parseInt(task.subtopologyId), 
task.partitionId);
+    }
+
+    private Set<TopicPartition> toTopicPartitions(final 
StreamsAssignmentInterface.TaskId task,
+                                                  final Subtopology 
subTopology) {
+        return
+            Stream.concat(
+                subTopology.sourceTopics.stream(),
+                subTopology.repartitionSourceTopics.keySet().stream()
+            )
+            .map(t -> new TopicPartition(t, task.partitionId))
+            .collect(Collectors.toSet());
+    }
+
     /**
      * Get the next batch of records by polling.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
index 9ee34d8398b..49428724b1a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
@@ -51,6 +51,9 @@ public class StreamsRebalanceListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(final Collection<TopicPartition> 
partitions) {
+
+        streamThread.maybeHandleAssignmentFromStreamsRebalanceProtocol();
+
         // NB: all task management is already handled by:
         // 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8a3e6c17d8a..9efb5780732 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1447,8 +1447,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+            null,
+            null,
+            null).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         final StreamsException thrown = assertThrows(StreamsException.class, 
thread::run);
 
@@ -2673,8 +2674,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(State.PENDING_SHUTDOWN);
@@ -2731,8 +2733,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(State.PENDING_SHUTDOWN);
@@ -2798,8 +2801,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(State.PENDING_SHUTDOWN);
@@ -2861,8 +2865,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(State.PENDING_SHUTDOWN);
@@ -2921,8 +2926,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(State.PENDING_SHUTDOWN);
@@ -3154,8 +3160,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        );
+            null,
+            null,
+            null);
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<>());
         final Metric testMetric = new KafkaMetric(
             new Object(),
@@ -3210,8 +3217,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             (e, b) -> { },
-            null
-        ) {
+            null,
+            null,
+            null) {
             @Override
             void runOnceWithProcessingThreads() {
                 setState(StreamThread.State.PENDING_SHUTDOWN);
@@ -3589,8 +3597,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             null,
-            null
-        );
+            null,
+            null,
+            null);
     }
 
     private TaskManager mockTaskManager(final Task runningTask) {
@@ -3710,8 +3719,9 @@ public class StreamThreadTest {
             new LinkedList<>(),
             null,
             HANDLER,
-            null
-        );
+            null,
+            null,
+            null);
     }
     
     private void runOnce(final boolean processingThreadsEnabled) {

Reply via email to