This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7a9105b93fc6bfc829e52dfd24eb5ea1302c99b6
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon Sep 2 11:18:19 2024 +0200

    Improve the Streams group initialization handler
    
    This commit:
    - schedules a timeout for the initialization call
    - requests the initialization only from the one member
    - intializations of unknown topologies or already initialized topologies
    are silently dropped
---
 .../internals/StreamsAssignmentInterface.java      |   3 +
 .../StreamsGroupHeartbeatRequestManager.java       |   4 +
 .../StreamsGroupInitializeRequestManager.java      |   3 +-
 .../message/StreamsGroupInitializeRequest.json     |   4 +-
 .../StreamsGroupInitializeRequestManagerTest.java  |   2 +-
 .../common/runtime/CoordinatorRuntime.java         |   5 +
 .../common/runtime/CoordinatorTimer.java           |   9 +
 .../common/runtime/MockCoordinatorTimer.java       |   3 +-
 .../coordinator/group/GroupMetadataManager.java    | 141 ++++-
 .../streams/CoordinatorStreamsRecordHelpers.java   |  14 +-
 .../coordinator/group/streams/StreamsGroup.java    |  54 +-
 .../coordinator/group/streams/StreamsTopology.java |  38 +-
 .../group/streams/TargetAssignmentBuilder.java     |   6 +-
 .../group/GroupCoordinatorShardTest.java           |  10 +-
 .../group/GroupMetadataManagerTest.java            | 590 ++++++++++++++++-----
 .../group/GroupMetadataManagerTestContext.java     |   9 +
 .../CoordinatorStreamsRecordHelpersTest.java       |   2 +-
 .../group/streams/StreamsGroupBuilder.java         |   6 +-
 .../group/streams/StreamsGroupTest.java            |  31 +-
 .../group/streams/TargetAssignmentBuilderTest.java |  46 +-
 20 files changed, 743 insertions(+), 237 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 5793ffec2aa..567fcb5f776 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -73,6 +73,9 @@ public class StreamsAssignmentInterface {
         return assignmentConfiguration;
     }
 
+    // ToDo: As long as we do not compute the topology ID, let's use a 
constant one
+    public final String topologyId = "topology-id";
+
     // TODO: This needs to be used somewhere
     public Map<TaskId, Long> taskLags() {
         return taskLags;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index ba0a289f187..e93b0e51c10 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -516,6 +516,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             final StreamsAssignmentInterface streamsInterface,
             final ConsumerMembershipManager membershipManager,
             final int rebalanceTimeoutMs) {
+
             this.membershipManager = membershipManager;
             this.rebalanceTimeoutMs = rebalanceTimeoutMs;
             this.sentFields = new 
StreamsGroupHeartbeatRequestManager.HeartbeatState.SentFields();
@@ -532,6 +533,9 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
             // GroupId - always sent
             data.setGroupId(membershipManager.groupId());
 
+            // TopologyId - always sent
+            data.setTopologyId(streamsInterface.topologyId);
+
             // MemberId - always sent, empty until it has been received from 
the coordinator
             data.setMemberId(membershipManager.memberId());
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
index e7f763bd969..0a6a67594c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java
@@ -68,6 +68,7 @@ public class StreamsGroupInitializeRequestManager implements 
RequestManager {
     private NetworkClientDelegate.UnsentRequest makeRequest() {
         final StreamsGroupInitializeRequestData 
streamsGroupInitializeRequestData = new StreamsGroupInitializeRequestData();
         streamsGroupInitializeRequestData.setGroupId(groupId);
+        
streamsGroupInitializeRequestData.setTopologyId(streamsAssignmentInterface.topologyId);
         final List<StreamsGroupInitializeRequestData.Subtopology> topology = 
getTopologyFromStreams();
         streamsGroupInitializeRequestData.setTopology(topology);
         final StreamsGroupInitializeRequest.Builder 
streamsGroupInitializeRequestBuilder = new 
StreamsGroupInitializeRequest.Builder(
@@ -91,7 +92,7 @@ public class StreamsGroupInitializeRequestManager implements 
RequestManager {
     private static StreamsGroupInitializeRequestData.Subtopology 
getSubtopologyFromStreams(final String subtopologyName,
                                                                                
            final StreamsAssignmentInterface.Subtopology subtopology) {
         final StreamsGroupInitializeRequestData.Subtopology subtopologyData = 
new StreamsGroupInitializeRequestData.Subtopology();
-        subtopologyData.setSubtopology(subtopologyName);
+        subtopologyData.setSubtopologyId(subtopologyName);
         subtopologyData.setSourceTopics(new 
ArrayList<>(subtopology.sourceTopics));
         subtopologyData.setRepartitionSinkTopics(new 
ArrayList<>(subtopology.sinkTopics));
         
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
index 602b557e3e9..3223d1e8d9e 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json
@@ -23,10 +23,12 @@
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The group identifier." },
+    { "name": "TopologyId", "type": "string", "versions": "0+",
+      "about": "The ID of the topology." },
     { "name":  "Topology", "type": "[]Subtopology", "versions": "0+",
       "about": "The sub-topologies of the streams application.",
       "fields": [
-        { "name": "Subtopology", "type": "string", "versions": "0+",
+        { "name": "SubtopologyId", "type": "string", "versions": "0+",
           "about": "String to uniquely identify the sub-topology. 
Deterministically generated from the topology" },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
index 48108204474..ebba584b0db 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManagerTest.java
@@ -114,7 +114,7 @@ class StreamsGroupInitializeRequestManagerTest {
         final List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies = streamsGroupInitializeRequestData.topology();
         assertEquals(1, subtopologies.size());
         final StreamsGroupInitializeRequestData.Subtopology subtopology = 
subtopologies.get(0);
-        assertEquals(subtopologyName1, subtopology.subtopology());
+        assertEquals(subtopologyName1, subtopology.subtopologyId());
         assertEquals(new ArrayList<>(sourceTopics), 
subtopology.sourceTopics());
         assertEquals(new ArrayList<>(sinkTopics), 
subtopology.repartitionSinkTopics());
         assertEquals(repartitionTopics.size(), 
subtopology.repartitionSourceTopics().size());
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index bef30a113df..d47bd967368 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -453,6 +453,11 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
         public int size() {
             return tasks.size();
         }
+
+        @Override
+        public boolean isScheduled(String key) {
+            return tasks.containsKey(key);
+        }
     }
 
     /**
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
index d10e38a7d82..c6e8456bf2b 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java
@@ -84,4 +84,13 @@ public interface CoordinatorTimer<T, U> {
      * @param key The key.
      */
     void cancel(String key);
+
+    /**
+     * Verifies whether an operation corresponding to a given key is scheduled.
+     *
+     * @param key The key.
+     *
+     * @return {@code true} if the operation is scheduled, {@code false} 
otherwise.
+     */
+    boolean isScheduled(String key);
 }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
index 5c55f59d608..6a17254abe9 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java
@@ -155,7 +155,8 @@ public class MockCoordinatorTimer<T, U> implements 
CoordinatorTimer<T, U> {
     /**
      * @return True if a timeout with the key exists; false otherwise.
      */
-    public boolean contains(String key) {
+    @Override
+    public boolean isScheduled(String key) {
         return timeoutMap.containsKey(key);
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 40c9e652a1b..bc551a1d89f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -216,14 +216,14 @@ import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
-import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord;
-import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupMemberTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord;
-import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedActiveTasksChanged;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedStandbyTasksChanged;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedWarmupTasksChanged;
@@ -1158,7 +1158,7 @@ public class GroupMetadataManager {
         }
 
         if (group == null) {
-            log.info("Reading persisted streams group {}", groupId);
+            log.info("Creating persisted streams group {}", groupId);
             StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, 
groupId, metrics);
             groups.put(groupId, streamsGroup);
             metrics.onStreamsGroupStateTransition(null, streamsGroup.state());
@@ -2402,7 +2402,10 @@ public class GroupMetadataManager {
 
         scheduleStreamsGroupSessionTimeout(groupId, memberId);
 
-        if (group.topology() == null) {
+        boolean requestTopologyInitialization = false;
+        if (group.topology() == null && 
!isTopologyInitializationScheduled(groupId, topologyId)) {
+            requestTopologyInitialization = true;
+            scheduleStreamsGroupTopologyInitializationTimeout(groupId, 
topologyId, memberId, rebalanceTimeoutMs);
             log.info("Asking member {} at {} to initialize the topology", 
memberId, clientHost);
         }
 
@@ -2410,7 +2413,7 @@ public class GroupMetadataManager {
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setShouldInitializeTopology(group.topology() == null)
+            .setShouldInitializeTopology(requestTopologyInitialization)
             .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs);
 
         // The assignment is only provided in the following cases:
@@ -2654,19 +2657,28 @@ public class GroupMetadataManager {
      * Handles the initialization of the topology information on the broker 
side, that will be reused by all members of the group.
      *
      * @param groupId       The group id from the request.
-     * @param subtopologies The list of subtopologies
+     * @param topologyId    The topology ID.
+     * @param subtopologies The list of subtopologies.
      * @return A Result containing the StreamsGroupInitialize response and a 
list of records to update the state machine.
      */
     private CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> streamsGroupInitialize(String groupId,
+                                                                               
                        String topologyId,
                                                                                
                             
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies)
         throws ApiException {
+
         final List<CoordinatorRecord> records = new ArrayList<>();
 
-        log.info("Initializing topology for group {} to {}", groupId, 
subtopologies);
+        log.info("Initializing topology {} for group {} to {}", topologyId, 
groupId, subtopologies);
 
         final StreamsGroup group = getOrMaybeCreateStreamsGroup(groupId, 
false, records);
         throwIfNull(group, "group does not exist");
 
+        if (!isTopologyInitializationScheduled(groupId, topologyId)) {
+            log.warn("No topology to initialize for group ID {} and topology 
ID {} found.", groupId, topologyId);
+            StreamsGroupInitializeResponseData response = new 
StreamsGroupInitializeResponseData();
+            return new CoordinatorResult<>(records, response);
+        }
+
         // TODO: For the POC, only check if internal topics exist
         Set<String> missingTopics = new HashSet<>();
         for (StreamsGroupInitializeRequestData.Subtopology subtopology : 
subtopologies) {
@@ -2681,6 +2693,9 @@ public class GroupMetadataManager {
                 }
             }
         }
+
+        cancelStreamsGroupTopologyInitializationTimeout(groupId, topologyId);
+
         if (!missingTopics.isEmpty()) {
             StreamsGroupInitializeResponseData response =
                 new StreamsGroupInitializeResponseData()
@@ -2691,6 +2706,10 @@ public class GroupMetadataManager {
         } else {
             records.add(newStreamsGroupTopologyRecord(groupId, subtopologies));
 
+            final StreamsTopology topology = new StreamsTopology(topologyId, 
subtopologies);
+
+            computeFirstTargetAssignmentAfterTopologyInitialization(group, 
records, topology);
+
             StreamsGroupInitializeResponseData response = new 
StreamsGroupInitializeResponseData();
 
             return new CoordinatorResult<>(records, response);
@@ -2698,6 +2717,56 @@ public class GroupMetadataManager {
 
     }
 
+    // ToDo: verify how much code we can share with Streams heartbeat handler
+    private void 
computeFirstTargetAssignmentAfterTopologyInitialization(StreamsGroup group,
+                                                                         
List<CoordinatorRecord> records,
+                                                                         
StreamsTopology topology) {
+        String groupId = group.groupId();
+        final long currentTimeMs = time.milliseconds();
+        int groupEpoch = group.groupEpoch();
+
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
subscriptionMetadata =
+            group.computeSubscriptionMetadata(
+                metadataImage.topics(),
+                metadataImage.cluster(),
+                topology
+            );
+        if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+            log.info("[GroupId {}] Computed new partition metadata: {}.", 
groupId, subscriptionMetadata);
+            records.add(newStreamsGroupPartitionMetadataRecord(groupId, 
subscriptionMetadata));
+        }
+        groupEpoch += 1;
+        records.add(newStreamsGroupEpochRecord(groupId, groupEpoch));
+        log.info("[GroupId {}] Bumped streams group epoch to {}.", groupId, 
groupEpoch);
+        metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+        group.setMetadataRefreshDeadline(currentTimeMs + 
streamsGroupMetadataRefreshIntervalMs, groupEpoch);
+
+        // TODO: Read the preferred server assignor from the group 
configuration
+        String preferredServerAssignor = defaultTaskAssignor.name();
+        try {
+            org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder 
assignmentResultBuilder =
+                new 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(group.groupId(),
 groupEpoch, taskAssignors.get(preferredServerAssignor))
+                    .withMembers(group.members())
+                    .withTopology(topology)
+                    .withStaticMembers(group.staticMembers())
+                    .withSubscriptionMetadata(subscriptionMetadata)
+                    .withTargetAssignment(group.targetAssignment());
+            
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 assignmentResult =
+                assignmentResultBuilder
+                    .build();
+
+            log.info("[GroupId {}] Computed a new target assignment for epoch 
{} with '{}' assignor: {}.",
+                group.groupId(), groupEpoch, preferredServerAssignor, 
assignmentResult.targetAssignment());
+
+            records.addAll(assignmentResult.records());
+        } catch (PartitionAssignorException ex) {
+            String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
+                groupEpoch, ex.getMessage());
+            log.error("[GroupId {}] {}.", group.groupId(), msg);
+            throw new UnknownServerException(msg, ex);
+        }
+    }
+
     /**
      * Handle a JoinGroupRequest to a ConsumerGroup.
      *
@@ -3692,7 +3761,7 @@ public class GroupMetadataManager {
             .build();
 
         if (!updatedMember.equals(member)) {
-            records.add(newStreamsCurrentAssignmentRecord(groupId, 
updatedMember));
+            records.add(newStreamsGroupCurrentAssignmentRecord(groupId, 
updatedMember));
 
             log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
                     + "assignedActiveTasks={}, assignedStandbyTasks={}, 
assignedWarmupTasks={} and revokedPartitions={}.",
@@ -4062,7 +4131,7 @@ public class GroupMetadataManager {
             .build();
 
         return new CoordinatorResult<>(
-            
Collections.singletonList(newStreamsCurrentAssignmentRecord(group.groupId(), 
leavingStaticMember)),
+            
Collections.singletonList(newStreamsGroupCurrentAssignmentRecord(group.groupId(),
 leavingStaticMember)),
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(member.memberId())
                 .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
@@ -4224,8 +4293,8 @@ public class GroupMetadataManager {
      * @param memberId      The member id.
      */
     private void removeStreamsMember(List<CoordinatorRecord> records, String 
groupId, String memberId) {
-        records.add(newStreamsCurrentAssignmentTombstoneRecord(groupId, 
memberId));
-        records.add(newStreamsTargetAssignmentTombstoneRecord(groupId, 
memberId));
+        records.add(newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, 
memberId));
+        records.add(newStreamsGroupTargetAssignmentTombstoneRecord(groupId, 
memberId));
         records.add(newStreamsGroupMemberTombstoneRecord(groupId, memberId));
     }
 
@@ -4459,6 +4528,49 @@ public class GroupMetadataManager {
     }
     
     /**
+     * Schedules (or reschedules) the topology initialisation timeout for the 
member.
+     *
+     * @param groupId                      The group id.
+     * @param topologyId                   The topology id.
+     * @param memberId                     The member id.
+     * @param topologyIntializationTimeout The topology initialization timeout.
+     */
+    private void scheduleStreamsGroupTopologyInitializationTimeout(
+        String groupId,
+        String topologyId,
+        String memberId,
+        int topologyIntializationTimeout
+    ) {
+        timer.schedule(
+            streamsTopologyInitializationTimeoutKey(groupId, topologyId),
+            topologyIntializationTimeout,
+            TimeUnit.MILLISECONDS,
+            true,
+            () -> streamsGroupFenceMemberOperation(groupId, memberId, "the 
timeout for the topology intialization expired.")
+        );
+    }
+
+    /**
+     * Verifies if the topology initialization timeout is scheduled.
+     *
+     * @param groupId       The group id.
+     * @param topologyId    The topology id.
+     * @return              {@code true} if the topology initialization 
timeout is scheduled, {@code false} otherwise.
+     */
+    private boolean isTopologyInitializationScheduled(String groupId, String 
topologyId) {
+        return 
timer.isScheduled(streamsTopologyInitializationTimeoutKey(groupId, topologyId));
+    }
+
+    /**
+     * Cancels the topology initialization timeout.
+     *
+     * @param groupId       The group id.
+     * @param topologyId     The topology id.
+     */
+    private void cancelStreamsGroupTopologyInitializationTimeout(String 
groupId, String topologyId) {
+        timer.cancel(streamsTopologyInitializationTimeoutKey(groupId, 
topologyId));
+    }
+
     /**
      * Cancels the session timeout of the member.
      *
@@ -4793,6 +4905,7 @@ public class GroupMetadataManager {
 
         return streamsGroupInitialize(
             request.groupId(),
+            request.topologyId(),
             request.topology()
         );
     }
@@ -5824,6 +5937,10 @@ public class GroupMetadataManager {
         return "rebalance-timeout-" + groupId + "-" + memberId;
     }
 
+    public static String streamsTopologyInitializationTimeoutKey(String 
groupId, String topologyId) {
+        return "topology-initialization-timeout-" + groupId + "-" + topologyId;
+    }
+
     /**
      * Replays GroupMetadataKey/Value to update the soft state of
      * the classic group.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
index 2f48151e137..0e615db1c14 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpers.java
@@ -202,7 +202,7 @@ public class CoordinatorStreamsRecordHelpers {
         );
     }
 
-    public static CoordinatorRecord newStreamsTargetAssignmentRecord(
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentRecord(
         String groupId,
         String memberId,
         Map<String, Set<Integer>> activeTasks,
@@ -258,7 +258,7 @@ public class CoordinatorStreamsRecordHelpers {
      * @param memberId The streams group member id.
      * @return The record.
      */
-    public static CoordinatorRecord newStreamsTargetAssignmentTombstoneRecord(
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentTombstoneRecord(
         String groupId,
         String memberId
     ) {
@@ -274,7 +274,7 @@ public class CoordinatorStreamsRecordHelpers {
     }
 
 
-    public static CoordinatorRecord newStreamsTargetAssignmentEpochRecord(
+    public static CoordinatorRecord newStreamsGroupTargetAssignmentEpochRecord(
         String groupId,
         int assignmentEpoch
     ) {
@@ -298,7 +298,7 @@ public class CoordinatorStreamsRecordHelpers {
      * @param groupId The streams group id.
      * @return The record.
      */
-    public static CoordinatorRecord 
newStreamsTargetAssignmentEpochTombstoneRecord(
+    public static CoordinatorRecord 
newStreamsGroupTargetAssignmentEpochTombstoneRecord(
         String groupId
     ) {
         return new CoordinatorRecord(
@@ -311,7 +311,7 @@ public class CoordinatorStreamsRecordHelpers {
         );
     }
 
-    public static CoordinatorRecord newStreamsCurrentAssignmentRecord(
+    public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
         String groupId,
         StreamsGroupMember member
     ) {
@@ -343,7 +343,7 @@ public class CoordinatorStreamsRecordHelpers {
      * @param memberId The streams group member id.
      * @return The record.
      */
-    public static CoordinatorRecord newStreamsCurrentAssignmentTombstoneRecord(
+    public static CoordinatorRecord 
newStreamsGroupCurrentAssignmentTombstoneRecord(
         String groupId,
         String memberId
     ) {
@@ -397,7 +397,7 @@ public class CoordinatorStreamsRecordHelpers {
                 return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
             }).collect(Collectors.toList());
 
-            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopology())
+            value.topology().add(new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
                 
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
                 
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
         });
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 1d4c2ace5e1..4003b9a1125 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -29,8 +29,6 @@ import org.apache.kafka.coordinator.group.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
-import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.image.ClusterImage;
 import org.apache.kafka.image.TopicImage;
@@ -49,11 +47,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyMap;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.INITIALIZING;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
 
@@ -64,6 +62,7 @@ public class StreamsGroup implements Group {
 
     public enum StreamsGroupState {
         EMPTY("Empty"),
+        INITIALIZING("Initializing"),
         ASSIGNING("Assigning"),
         RECONCILING("Reconciling"),
         STABLE("Stable"),
@@ -189,7 +188,7 @@ public class StreamsGroup implements Group {
     ) {
         this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
         this.groupId = Objects.requireNonNull(groupId);
-        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.state = new TimelineObject<>(snapshotRegistry, INITIALIZING);
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -246,6 +245,7 @@ public class StreamsGroup implements Group {
 
     public void setTopology(StreamsTopology topology) {
         this.topology.set(Optional.of(topology));
+        maybeUpdateGroupState();
     }
 
     /**
@@ -648,9 +648,10 @@ public class StreamsGroup implements Group {
      */
     public Map<String, TopicMetadata> computeSubscriptionMetadata(
         TopicsImage topicsImage,
-        ClusterImage clusterImage
+        ClusterImage clusterImage,
+        StreamsTopology topology
     ) {
-        Set<String> subscribedTopicNames = 
topology.get().map(StreamsTopology::topicSubscription).orElse(Collections.emptySet());
+        Set<String> subscribedTopicNames = topology.topicSubscription();
 
         // Create the topic metadata for each subscribed topic.
         Map<String, TopicMetadata> newSubscriptionMetadata = new 
HashMap<>(subscribedTopicNames.size());
@@ -685,6 +686,24 @@ public class StreamsGroup implements Group {
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Computes the subscription metadata based on the current subscription 
info.
+     *
+     * @param topicsImage          The current metadata for all available 
topics.
+     * @param clusterImage         The current metadata for the Kafka cluster.
+     * @return An immutable map of subscription metadata for each topic that 
the consumer group is subscribed to.
+     */
+    public Map<String, TopicMetadata> computeSubscriptionMetadata(
+        TopicsImage topicsImage,
+        ClusterImage clusterImage
+    ) {
+        Optional<StreamsTopology> topology = this.topology.get();
+        if (topology.isPresent()) {
+            return computeSubscriptionMetadata(topicsImage, clusterImage, 
topology.get());
+        }
+        return Collections.emptyMap();
+    }
+
     /**
      * Updates the metadata refresh deadline.
      *
@@ -791,7 +810,7 @@ public class StreamsGroup implements Group {
      */
     @Override
     public void validateDeleteGroup() throws ApiException {
-        if (state() != StreamsGroupState.EMPTY) {
+        if (state() != StreamsGroupState.EMPTY && state() != 
StreamsGroupState.INITIALIZING) {
             throw Errors.NON_EMPTY_GROUP.exception();
         }
     }
@@ -809,13 +828,13 @@ public class StreamsGroup implements Group {
     @Override
     public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
         members().forEach((memberId, member) ->
-            
records.add(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentTombstoneRecord(groupId(),
 memberId))
+            
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId(),
 memberId))
         );
 
         members().forEach((memberId, member) ->
-            
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentTombstoneRecord(groupId(),
 memberId))
+            
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId(),
 memberId))
         );
-        
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochTombstoneRecord(groupId()));
+        
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId()));
 
         members().forEach((memberId, member) ->
             
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(),
 memberId))
@@ -865,7 +884,9 @@ public class StreamsGroup implements Group {
     private void maybeUpdateGroupState() {
         StreamsGroupState previousState = state.get();
         StreamsGroupState newState = STABLE;
-        if (members.isEmpty()) {
+        if (topology() == null) {
+            newState = INITIALIZING;
+        } else if (members.isEmpty()) {
             newState = EMPTY;
         } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
             newState = ASSIGNING;
@@ -1058,7 +1079,7 @@ public class StreamsGroup implements Group {
         
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId(),
 groupEpoch()));
 
         members().forEach((streamsGroupMemberId, streamsGroupMember) ->
-            
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(
+            
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
                 groupId(),
                 streamsGroupMemberId,
                 targetAssignment(streamsGroupMemberId).activeTasks(),
@@ -1067,11 +1088,11 @@ public class StreamsGroup implements Group {
             ))
         );
 
-        
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId(),
 groupEpoch()));
+        
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId(),
 groupEpoch()));
         
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId(),
 subscriptionMetadata()));
 
         members().forEach((__, streamsGroupMember) ->
-            
records.add(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId(),
 streamsGroupMember))
+            
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId(),
 streamsGroupMember))
         );
     }
 
@@ -1115,9 +1136,4 @@ public class StreamsGroup implements Group {
         }
         return false;
     }
-
-    public void setTopology(final StreamsGroupTopologyValue topology) {
-        this.topology.set(Optional.of(new 
StreamsTopology(topology.topologyId(), 
topology.topology().stream().collect(Collectors.toMap(
-            Subtopology::subtopology, x -> x)))));
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
index 5f09941fae9..ca2fd9a6da0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsTopology.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.coordinator.group.streams;
 
+import java.util.HashMap;
 import java.util.List;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
@@ -37,11 +39,41 @@ public class StreamsTopology {
 
     private final Map<String, Subtopology> subtopologies;
 
-    public StreamsTopology(final String topologyId, final Map<String, 
Subtopology> subtopologies) {
+    public StreamsTopology(final String topologyId,
+                           final Map<String, Subtopology> subtopologies) {
         this.topologyId = topologyId;
         this.subtopologies = subtopologies;
     }
 
+    public StreamsTopology(final String topologyId,
+                           final 
List<StreamsGroupInitializeRequestData.Subtopology> subtopologies) {
+        this.topologyId = topologyId;
+        this.subtopologies = new HashMap<>();
+        subtopologies.forEach(subtopology -> {
+            List<StreamsGroupTopologyValue.TopicInfo> repartitionSourceTopics 
= subtopology.repartitionSourceTopics().stream()
+                .map(topicInfo -> {
+                    List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
 topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+                        .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                        .collect(Collectors.toList()) : null;
+                    return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs)
+                        .setPartitions(topicInfo.partitions());
+                }).collect(Collectors.toList());
+
+            List<StreamsGroupTopologyValue.TopicInfo> stateChangelogTopics = 
subtopology.stateChangelogTopics().stream().map(topicInfo -> {
+                List<StreamsGroupTopologyValue.TopicConfig> topicConfigs = 
topicInfo.topicConfigs() != null ? topicInfo.topicConfigs().stream()
+                    .map(config -> new 
StreamsGroupTopologyValue.TopicConfig().setKey(config.key()).setValue(config.value()))
+                    .collect(Collectors.toList()) : null;
+                return new 
StreamsGroupTopologyValue.TopicInfo().setName(topicInfo.name()).setTopicConfigs(topicConfigs);
+            }).collect(Collectors.toList());
+
+            this.subtopologies.put(
+                subtopology.subtopologyId(),
+                new 
StreamsGroupTopologyValue.Subtopology().setSubtopology(subtopology.subtopologyId())
+                    
.setSourceTopics(subtopology.sourceTopics()).setRepartitionSinkTopics(subtopology.repartitionSinkTopics())
+                    
.setRepartitionSourceTopics(repartitionSourceTopics).setStateChangelogTopics(stateChangelogTopics));
+        });
+    }
+
     public String topologyId() {
         return topologyId;
     }
@@ -57,9 +89,7 @@ public class StreamsTopology {
                 Collectors.toSet());
     }
 
-    public static StreamsTopology fromRecord(
-        StreamsGroupTopologyValue record
-    ) {
+    public static StreamsTopology fromRecord(StreamsGroupTopologyValue record) 
{
         return new StreamsTopology(
             record.topologyId(),
             
record.topology().stream().collect(Collectors.toMap(Subtopology::subtopology, x 
-> x))
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
index a09919e2157..d4cb0b85275 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -306,7 +306,7 @@ public class TargetAssignmentBuilder {
 
             if (oldMemberAssignment == null) {
                 // If the member had no assignment, we always create a record 
for it.
-                
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(
+                
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
                     groupId,
                     memberId,
                     newMemberAssignment.activeTasks(),
@@ -317,7 +317,7 @@ public class TargetAssignmentBuilder {
                 // If the member had an assignment, we only create a record if 
the
                 // new assignment is different.
                 if (!newMemberAssignment.equals(oldMemberAssignment)) {
-                    
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(
+                    
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
                         groupId,
                         memberId,
                         newMemberAssignment.activeTasks(),
@@ -329,7 +329,7 @@ public class TargetAssignmentBuilder {
         });
 
         // Bump the target assignment epoch.
-        
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId,
 groupEpoch));
+        
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 groupEpoch));
 
         return new TargetAssignmentResult(records, newTargetAssignment);
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 263b2ab76ea..03966c07ea8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1009,16 +1009,16 @@ public class GroupCoordinatorShardTest {
 
         // Confirm the cleanup is scheduled when the coordinator is initially 
loaded.
         coordinator.onLoaded(image);
-        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+        assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY));
 
         // Confirm that it is rescheduled after completion.
         mockTime.sleep(1000L);
         List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> 
tasks = timer.poll();
         assertEquals(1, tasks.size());
-        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+        assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY));
 
         coordinator.onUnloaded();
-        assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+        assertFalse(timer.isScheduled(GROUP_EXPIRATION_KEY));
     }
 
     @Test
@@ -1058,9 +1058,9 @@ public class GroupCoordinatorShardTest {
             return null;
         }).when(groupMetadataManager).maybeDeleteGroup(eq("group-id"), 
recordsCapture.capture());
 
-        assertFalse(timer.contains(GROUP_EXPIRATION_KEY));
+        assertFalse(timer.isScheduled(GROUP_EXPIRATION_KEY));
         CoordinatorResult<Void, CoordinatorRecord> result = 
coordinator.cleanupGroupMetadata();
-        assertTrue(timer.contains(GROUP_EXPIRATION_KEY));
+        assertTrue(timer.isScheduled(GROUP_EXPIRATION_KEY));
 
         List<CoordinatorRecord> expectedRecords = 
Arrays.asList(offsetCommitTombstone, groupMetadataTombstone);
         assertEquals(expectedRecords, result.records());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 05a94ea49a6..58d3a4e5ecc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -56,6 +56,9 @@ import 
org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsGroupInitializeRequestData;
+import org.apache.kafka.common.message.StreamsGroupInitializeResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -95,6 +98,7 @@ import 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelper
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -125,6 +129,8 @@ import static 
org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertResponseEquals;
@@ -410,6 +416,440 @@ public class GroupMetadataManagerTest {
         assertEquals(2, result.response().memberEpoch());
     }
 
+    @Test
+    public void testJoiningNonExistingStreamsGroup() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeat = 
buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, processId, 
rebalanceTimeoutMs);
+
+        CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.streamsGroupHeartbeat(heartbeat);
+
+        assertNotNull(result.response());
+        StreamsGroupHeartbeatResponseData response = result.response();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertFalse(response.memberId().isEmpty());
+        assertEquals(1, response.memberEpoch());
+        assertTrue(response.shouldInitializeTopology());
+        assertTrue(response.activeTasks().isEmpty());
+        assertTrue(response.standbyTasks().isEmpty());
+        assertTrue(response.warmupTasks().isEmpty());
+        List<CoordinatorRecord> coordinatorRecords = result.records();
+        assertEquals(5, coordinatorRecords.size());
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 1)));
+        StreamsGroupMember member = new 
StreamsGroupMember.Builder(response.memberId())
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
+            .setTopologyId(topologyId)
+            .setProcessId(processId)
+            .build();
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member)));
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1)));
+        assertTrue(coordinatorRecords.contains(
+            
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                groupId,
+                member.memberId(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap()
+            )
+        ));
+        StreamsGroupMember updatedMember = new 
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member)
+            .withTargetAssignment(
+                1,
+                new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap())
+            )
+            .withCurrentActiveTaskEpoch((a, b) -> 1)
+            .withOwnedActiveTasks(Collections.emptyList())
+            .withOwnedStandbyTasks(Collections.emptyList())
+            .withOwnedWarmupTasks(Collections.emptyList())
+            .build();
+        
CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
updatedMember);
+        assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, 
context.streamsGroupState("group-id"));
+        
assertTrue(context.timer.isScheduled("topology-initialization-timeout-group-id-topology-id"));
+        assertEquals(rebalanceTimeoutMs, 
context.timer.timeout("topology-initialization-timeout-group-id-topology-id").deadlineMs
 - context.time.milliseconds());
+    }
+
+    @Test
+    public void testJoiningExistingInitializingStreamsGroup() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        StreamsGroupHeartbeatRequestData heartbeat =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+
+        CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.streamsGroupHeartbeat(heartbeat);
+
+        assertNotNull(result.response());
+        StreamsGroupHeartbeatResponseData response = result.response();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertFalse(response.memberId().isEmpty());
+        assertEquals(2, response.memberEpoch());
+        assertFalse(response.shouldInitializeTopology());
+        assertTrue(response.activeTasks().isEmpty());
+        assertTrue(response.standbyTasks().isEmpty());
+        assertTrue(response.warmupTasks().isEmpty());
+        List<CoordinatorRecord> coordinatorRecords = result.records();
+        assertEquals(5, coordinatorRecords.size());
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 2)));
+        StreamsGroupMember member = new 
StreamsGroupMember.Builder(response.memberId())
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
+            .setTopologyId(topologyId)
+            .setProcessId(processId)
+            .build();
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member)));
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2)));
+        assertTrue(coordinatorRecords.contains(
+            
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                groupId,
+                member.memberId(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap()
+            )
+        ));
+        StreamsGroupMember updatedMember = new 
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member)
+            .withTargetAssignment(
+                1,
+                new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap())
+            )
+            .withCurrentActiveTaskEpoch((a, b) -> 1)
+            .withOwnedActiveTasks(Collections.emptyList())
+            .withOwnedStandbyTasks(Collections.emptyList())
+            .withOwnedWarmupTasks(Collections.emptyList())
+            .build();
+        
CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
updatedMember);
+        assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, 
context.streamsGroupState("group-id"));
+    }
+
+    @Test
+    public void testInitTopologyExistingInitializingStreamsGroup() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        String inputTopicName = "input-topic";
+        String subtopologyId = "subtopology-id";
+        Uuid inputTopicId = Uuid.randomUuid();
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(inputTopicId, inputTopicName, 3)
+            .addRacks()
+            .build();
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        final CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> heartbeatResult =
+            
context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        final List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies = new ArrayList<>();
+        subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology()
+            .setSubtopologyId(subtopologyId)
+            .setRepartitionSinkTopics(Collections.emptyList())
+            .setRepartitionSourceTopics(Collections.emptyList())
+            .setSourceTopics(Collections.singletonList("input-topic"))
+        );
+        prepareStreamsGroupAssignment(assignor, 
heartbeatResult.response().memberId(), subtopologyId);
+        StreamsGroupInitializeRequestData initialize = new 
StreamsGroupInitializeRequestData()
+            .setGroupId(groupId)
+            .setTopologyId(topologyId)
+            .setTopology(subtopologies);
+
+        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+
+        assertNotNull(result.response());
+        StreamsGroupInitializeResponseData response = result.response();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        List<CoordinatorRecord> coordinatorRecords = result.records();
+        assertEquals(5, coordinatorRecords.size());
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
 2)));
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 subtopologies)));
+        org.apache.kafka.coordinator.group.streams.TopicMetadata topicMetadata 
= new org.apache.kafka.coordinator.group.streams.TopicMetadata(
+            inputTopicId,
+            inputTopicName,
+            3,
+            mkMap(
+                mkEntry(0, mkSet("rack0", "rack1")),
+                mkEntry(1, mkSet("rack1", "rack2")),
+                mkEntry(2, mkSet("rack2", "rack3"))
+            )
+        );
+        assertTrue(coordinatorRecords.contains(
+            
CoordinatorStreamsRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
mkMap(mkEntry(inputTopicName, topicMetadata)))
+        ));
+        
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 2)));
+        assertTrue(coordinatorRecords.contains(
+            
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                groupId,
+                heartbeatResult.response().memberId(),
+                
TaskAssignmentTestUtil.mkAssignment(TaskAssignmentTestUtil.mkTaskAssignment(subtopologyId,
 0, 1, 2)),
+                Collections.emptyMap(),
+                Collections.emptyMap()
+            )
+        ));
+        
assertFalse(context.timer.isScheduled("topology-initialization-timeout-group-id-topology-id"));
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
context.streamsGroupState("group-id"));
+    }
+
+    @Test
+    public void testInitTopologyWithWrongGroupId() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        String inputTopicName = "input-topic";
+        String subtopologyId = "subtopology-id";
+        Uuid inputTopicId = Uuid.randomUuid();
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(inputTopicId, inputTopicName, 3)
+            .addRacks()
+            .build();
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        final CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> heartbeatResult =
+            
context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        final List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies = new ArrayList<>();
+        subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology()
+            .setSubtopologyId(subtopologyId)
+            .setRepartitionSinkTopics(Collections.emptyList())
+            .setRepartitionSourceTopics(Collections.emptyList())
+            .setSourceTopics(Collections.singletonList("input-topic"))
+        );
+        prepareStreamsGroupAssignment(assignor, 
heartbeatResult.response().memberId(), subtopologyId);
+        final String wrongGroupId = "wrong-" + groupId;
+        StreamsGroupInitializeRequestData initialize = new 
StreamsGroupInitializeRequestData()
+            .setGroupId(wrongGroupId)
+            .setTopologyId(topologyId)
+            .setTopology(subtopologies);
+
+        final GroupIdNotFoundException groupIdNotFoundException =
+            assertThrows(GroupIdNotFoundException.class, () -> 
context.streamsGroupInitialize(initialize));
+
+        assertEquals(
+            "Streams group " + wrongGroupId + " not found.",
+            groupIdNotFoundException.getMessage()
+        );
+    }
+
+    @Test
+    public void testInitTopologyWithWrongTopologyId() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        String inputTopicName = "input-topic";
+        String subtopologyId = "subtopology-id";
+        Uuid inputTopicId = Uuid.randomUuid();
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(inputTopicId, inputTopicName, 3)
+            .addRacks()
+            .build();
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        final CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> heartbeatResult =
+            
context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        final List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies = new ArrayList<>();
+        subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology()
+            .setSubtopologyId(subtopologyId)
+            .setRepartitionSinkTopics(Collections.emptyList())
+            .setRepartitionSourceTopics(Collections.emptyList())
+            .setSourceTopics(Collections.singletonList("input-topic"))
+        );
+        prepareStreamsGroupAssignment(assignor, 
heartbeatResult.response().memberId(), subtopologyId);
+        final String wrongTopologyId = "wrong-" + topologyId;
+        StreamsGroupInitializeRequestData initialize = new 
StreamsGroupInitializeRequestData()
+            .setGroupId(groupId)
+            .setTopologyId(wrongTopologyId)
+            .setTopology(subtopologies);
+
+        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+
+        assertNotNull(result.response());
+        StreamsGroupInitializeResponseData response = result.response();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertTrue(result.records().isEmpty());
+    }
+
+    @Test
+    public void testInitTopologyForInitializedStreamsGroup() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        String inputTopicName = "input-topic";
+        String subtopologyId = "subtopology-id";
+        Uuid inputTopicId = Uuid.randomUuid();
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(inputTopicId, inputTopicName, 3)
+            .addRacks()
+            .build();
+        MockTaskAssignor assignor = new MockTaskAssignor("mock");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(metadataImage)
+            .withTaskAssignors(Collections.singletonList(assignor))
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        final CoordinatorResult<StreamsGroupHeartbeatResponseData, 
CoordinatorRecord> heartbeatResult =
+            
context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        final List<StreamsGroupInitializeRequestData.Subtopology> 
subtopologies = new ArrayList<>();
+        subtopologies.add(new StreamsGroupInitializeRequestData.Subtopology()
+            .setSubtopologyId(subtopologyId)
+            .setRepartitionSinkTopics(Collections.emptyList())
+            .setRepartitionSourceTopics(Collections.emptyList())
+            .setSourceTopics(Collections.singletonList("input-topic"))
+        );
+        prepareStreamsGroupAssignment(assignor, 
heartbeatResult.response().memberId(), subtopologyId);
+        StreamsGroupInitializeRequestData initialize = new 
StreamsGroupInitializeRequestData()
+            .setGroupId(groupId)
+            .setTopologyId(topologyId)
+            .setTopology(subtopologies);
+
+        context.streamsGroupInitialize(initialize);
+        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result = context.streamsGroupInitialize(initialize);
+
+        assertNotNull(result.response());
+        StreamsGroupInitializeResponseData response = result.response();
+        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertTrue(result.records().isEmpty());
+    }
+
+    @Test
+    public void testTopologyInitializationMissingInternalTopics() {
+        String groupId = "group-id";
+        int rebalanceTimeoutMs = 300000;
+        String topologyId = "topology-id";
+        String processId = "process-id";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "repartition";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+        StreamsGroupHeartbeatRequestData heartbeatToCreateAndRequestInitGroup =
+            buildFirstStreamsGroupHeartbeatRequest(groupId, topologyId, 
processId, rebalanceTimeoutMs);
+        context.streamsGroupHeartbeat(heartbeatToCreateAndRequestInitGroup);
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.consumerGroup(groupId));
+        final List<StreamsGroupInitializeRequestData.Subtopology> topology = 
Collections.singletonList(
+            new StreamsGroupInitializeRequestData.Subtopology()
+                .setSubtopologyId("subtopology-id")
+                .setSourceTopics(Collections.singletonList("bar"))
+                .setRepartitionSourceTopics(
+                    Collections.singletonList(
+                        new StreamsGroupInitializeRequestData.TopicInfo()
+                            .setName("repartition")
+                            .setPartitions(4)
+                            .setTopicConfigs(Collections.singletonList(
+                                new 
StreamsGroupInitializeRequestData.TopicConfig()
+                                    .setKey("config-name1")
+                                    .setValue("config-value1")
+                            ))
+                    )
+                )
+                .setStateChangelogTopics(
+                    Collections.singletonList(
+                        new StreamsGroupInitializeRequestData.TopicInfo()
+                            .setName("changelog")
+                            .setTopicConfigs(Collections.singletonList(
+                                new 
StreamsGroupInitializeRequestData.TopicConfig()
+                                    .setKey("config-name2")
+                                    .setValue("config-value2")
+                            ))
+                    )
+                )
+        );
+        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
+            context.streamsGroupInitialize(
+                new StreamsGroupInitializeRequestData()
+                    .setGroupId(groupId)
+                    .setTopologyId(topologyId)
+                    .setTopology(topology)
+            );
+
+        assertEquals(
+            new StreamsGroupInitializeResponseData()
+                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
+                .setErrorMessage("Internal topics changelog do not exist."),
+            result.response()
+        );
+        
+        assertTrue(result.records().isEmpty());
+    }
+
+    private StreamsGroupHeartbeatRequestData 
buildFirstStreamsGroupHeartbeatRequest(
+        final String groupId,
+        final String topologyId,
+        final String processId,
+        final int rebalanceTimeoutMs) {
+
+        return new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId("")
+            .setMemberEpoch(0)
+            .setInstanceId(null)
+            .setRackId(null)
+            .setRebalanceTimeoutMs(rebalanceTimeoutMs)
+            .setTopologyId(topologyId)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+            .setProcessId(processId)
+            .setUserEndpoint(null)
+            .setClientTags(null)
+            .setTaskOffsets(null)
+            .setTaskEndOffsets(null)
+            .setShutdownApplication(false);
+    }
+
+    private void prepareStreamsGroupAssignment(final MockTaskAssignor assignor,
+                                               final String memberId,
+                                               final String subtopologyId) {
+        assignor.prepareGroupAssignment(new 
org.apache.kafka.coordinator.group.taskassignor.GroupAssignment(
+            mkMap(
+                mkEntry(
+                    memberId,
+                    new 
org.apache.kafka.coordinator.group.taskassignor.MemberAssignment(
+                        mkMap(
+                            mkEntry(
+                                subtopologyId,
+                                mkSet(0, 1, 2)
+                            )
+                        ),
+                        Collections.emptyMap(),
+                        Collections.emptyMap()
+                    )
+                )
+            )
+        ));
+    }
+
     @Test
     public void testMemberIdGeneration() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@@ -8865,7 +9305,7 @@ public class GroupMetadataManagerTest {
             new StreamsGroupDescribeResponseData.DescribedGroup()
                 .setGroupEpoch(epoch)
                 .setGroupId(streamsGroupIds.get(0))
-                .setGroupState(StreamsGroup.StreamsGroupState.EMPTY.toString())
+                
.setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString())
                 .setAssignmentEpoch(0),
             new StreamsGroupDescribeResponseData.DescribedGroup()
                 .setGroupEpoch(epoch)
@@ -8875,7 +9315,7 @@ public class GroupMetadataManagerTest {
                         new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())
                     )
                 ))
-                
.setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
+                
.setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString())
         );
         List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(streamsGroupIds);
 
@@ -8924,8 +9364,8 @@ public class GroupMetadataManagerTest {
 
         StreamsGroupMember.Builder memberBuilder2 = new 
StreamsGroupMember.Builder(memberId2);
         
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder2.build()));
-        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(streamsGroupId,
 memberId2, assignmentMap, assignmentMap, assignmentMap));
-        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
 memberId2, assignmentMap, assignmentMap, assignmentMap));
+        
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
         
context.replay(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
 epoch + 2));
 
         List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(Collections.singletonList(streamsGroupId),
 context.lastCommittedOffset);
@@ -8945,7 +9385,7 @@ public class GroupMetadataManagerTest {
                 memberBuilder1.build().asStreamsGroupDescribeMember(new 
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap())),
                 memberBuilder2.build().asStreamsGroupDescribeMember(new 
org.apache.kafka.coordinator.group.streams.Assignment(assignmentMap, 
assignmentMap, assignmentMap))
             ))
-            .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
+            
.setGroupState(StreamsGroup.StreamsGroupState.INITIALIZING.toString())
             .setGroupEpoch(epoch + 2);
         assertEquals(1, actual.size());
         assertEquals(describedGroup, actual.get(0));
@@ -15963,144 +16403,4 @@ public class GroupMetadataManagerTest {
         assertEquals(expectedSuccessCount, successCount);
         return memberIds;
     }
-
-    // TODO: bring back topology initialization unit tests
-//    @Test
-//    public void testTopologyInitialization() {
-//        String groupId = "fooup";
-//
-//        Uuid fooTopicId = Uuid.randomUuid();
-//        String fooTopicName = "repartition";
-//        Uuid barTopicId = Uuid.randomUuid();
-//        String barTopicName = "changelog";
-//
-//        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-//            .withMetadataImage(new MetadataImageBuilder()
-//                .addTopic(fooTopicId, fooTopicName, 6)
-//                .addTopic(barTopicId, barTopicName, 3)
-//                .addRacks()
-//                .build())
-//            .build();
-//
-//        context.createStreamsGroup(groupId);
-//
-//        assertThrows(GroupIdNotFoundException.class, () ->
-//            context.groupMetadataManager.consumerGroup(groupId));
-//
-//        final List<Subtopology> topology = Collections.singletonList(
-//            new Subtopology()
-//                .setSubtopology("subtopology-id")
-//                .setSinkTopics(Collections.singletonList("foo"))
-//                .setSourceTopics(Collections.singletonList("bar"))
-//                .setRepartitionSourceTopics(
-//                    Collections.singletonList(
-//                        new TopicInfo()
-//                            .setName("repartition")
-//                            .setPartitions(4)
-//                            .setTopicConfigs(Collections.singletonList(
-//                                new TopicConfig()
-//                                    .setKey("config-name1")
-//                                    .setValue("config-value1")
-//                            ))
-//                    )
-//                )
-//                .setStateChangelogTopics(
-//                    Collections.singletonList(
-//                        new TopicInfo()
-//                            .setName("changelog")
-//                            .setTopicConfigs(Collections.singletonList(
-//                                new TopicConfig()
-//                                    .setKey("config-name2")
-//                                    .setValue("config-value2")
-//                            ))
-//                    )
-//                )
-//        );
-//        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
-//            context.streamsGroupInitialize(
-//                new streamsGroupInitializeRequestData()
-//                    .setGroupId(groupId)
-//                    .setTopology(topology)
-//            );
-//
-//        assertEquals(
-//            new StreamsGroupInitializeResponseData(),
-//            result.response()
-//        );
-//
-//        List<CoordinatorRecord> expectedRecords = Arrays.asList(
-//            CoordinatorRecordHelpers.newStreamsGroupTopologyRecord(
-//                groupId,
-//                topology
-//            )
-//        );
-//
-//        assertRecordsEquals(expectedRecords, result.records());
-//    }
-//
-//    @Test
-//    public void testTopologyInitializationMissingInternalTopics() {
-//        String groupId = "fooup";
-//
-//        Uuid fooTopicId = Uuid.randomUuid();
-//        String fooTopicName = "repartition";
-//
-//        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
-//            .withMetadataImage(new MetadataImageBuilder()
-//                .addTopic(fooTopicId, fooTopicName, 6)
-//                .addRacks()
-//                .build())
-//            .build();
-//
-//        context.createStreamsGroup(groupId);
-//
-//        assertThrows(GroupIdNotFoundException.class, () ->
-//            context.groupMetadataManager.consumerGroup(groupId));
-//
-//        final List<Subtopology> topology = Collections.singletonList(
-//            new Subtopology()
-//                .setSubtopology("subtopology-id")
-//                .setSinkTopics(Collections.singletonList("foo"))
-//                .setSourceTopics(Collections.singletonList("bar"))
-//                .setRepartitionSourceTopics(
-//                    Collections.singletonList(
-//                        new TopicInfo()
-//                            .setName("repartition")
-//                            .setPartitions(4)
-//                            .setTopicConfigs(Collections.singletonList(
-//                                new TopicConfig()
-//                                    .setKey("config-name1")
-//                                    .setValue("config-value1")
-//                            ))
-//                    )
-//                )
-//                .setStateChangelogTopics(
-//                    Collections.singletonList(
-//                        new TopicInfo()
-//                            .setName("changelog")
-//                            .setTopicConfigs(Collections.singletonList(
-//                                new TopicConfig()
-//                                    .setKey("config-name2")
-//                                    .setValue("config-value2")
-//                            ))
-//                    )
-//                )
-//        );
-//        CoordinatorResult<StreamsGroupInitializeResponseData, 
CoordinatorRecord> result =
-//            context.streamsGroupInitialize(
-//                new streamsGroupInitializeRequestData()
-//                    .setGroupId(groupId)
-//                    .setTopology(topology)
-//            );
-//
-//        assertEquals(
-//            new StreamsGroupInitializeResponseData()
-//                .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
-//                .setErrorMessage("Internal topics changelog do not exist."),
-//            result.response()
-//        );
-//
-//        assertTrue(result.records().isEmpty());
-//
-//    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index d77f5414ee9..96fd18344c0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -109,6 +109,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupBuilder;
 import org.apache.kafka.coordinator.group.taskassignor.TaskAssignor;
 import org.apache.kafka.image.MetadataImage;
@@ -651,6 +652,14 @@ public class GroupMetadataManagerTestContext {
             .state();
     }
 
+    public StreamsGroup.StreamsGroupState streamsGroupState(
+        String groupId
+    ) {
+        return groupMetadataManager
+            .streamsGroup(groupId)
+            .state();
+    }
+
     public MemberState consumerGroupMemberState(
         String groupId,
         String memberId
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
index cf1a003cd84..b9298dab3f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CoordinatorStreamsRecordHelpersTest.java
@@ -34,7 +34,7 @@ class CoordinatorStreamsRecordHelpersTest {
     public void testNewStreamsGroupTopologyRecord() {
         List<StreamsGroupInitializeRequestData.Subtopology> topology =
             Collections.singletonList(new 
StreamsGroupInitializeRequestData.Subtopology()
-                .setSubtopology("subtopology-id")
+                .setSubtopologyId("subtopology-id")
                 .setRepartitionSinkTopics(Collections.singletonList("foo"))
                 .setSourceTopics(Collections.singletonList("bar"))
                 .setRepartitionSourceTopics(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index b07dabc05e1..ed5c93d4b86 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -82,18 +82,18 @@ public class StreamsGroupBuilder {
         // Add target assignment records.
         assignments.forEach((memberId, assignment) ->
             records.add(
-                
CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord(groupId, 
memberId,
+                
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                     assignment.activeTasks(), assignment.standbyTasks(), 
assignment.warmupTasks()))
         );
 
         // Add target assignment epoch.
-        
records.add(CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId,
+        
records.add(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
             assignmentEpoch));
 
         // Add current assignment records for members.
         members.forEach((memberId, member) ->
             records.add(
-                
CoordinatorStreamsRecordHelpers.newStreamsCurrentAssignmentRecord(groupId, 
member))
+                
CoordinatorStreamsRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
member))
         );
 
         return records;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 2fd5d461c99..8d2a0bfa418 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -396,7 +396,7 @@ public class StreamsGroupTest {
     @Test
     public void testGroupState() {
         StreamsGroup streamsGroup = createStreamsGroup("foo");
-        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, 
streamsGroup.state());
 
         StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
             .setState(MemberState.STABLE)
@@ -407,6 +407,11 @@ public class StreamsGroupTest {
         streamsGroup.updateMember(member1);
         streamsGroup.setGroupEpoch(1);
 
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, 
streamsGroup.state());
+
+        streamsGroup.setTopology(new StreamsTopology("topology-id", 
Collections.emptyMap()));
+
         assertEquals(MemberState.STABLE, member1.state());
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
 
@@ -668,7 +673,7 @@ public class StreamsGroupTest {
     public void testValidateDeleteGroup() {
         StreamsGroup streamsGroup = createStreamsGroup("foo");
 
-        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        assertEquals(StreamsGroup.StreamsGroupState.INITIALIZING, 
streamsGroup.state());
         assertDoesNotThrow(streamsGroup::validateDeleteGroup);
 
         StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
@@ -676,6 +681,7 @@ public class StreamsGroupTest {
             .setPreviousMemberEpoch(0)
             .build();
         streamsGroup.updateMember(member1);
+        streamsGroup.setTopology(new StreamsTopology("topology-id", 
Collections.emptyMap()));
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
@@ -689,6 +695,10 @@ public class StreamsGroupTest {
 
         assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.removeMember("member1");
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        assertDoesNotThrow(streamsGroup::validateDeleteGroup);
     }
 
     @Test
@@ -717,14 +727,13 @@ public class StreamsGroupTest {
         );
         StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", 
metricsShard);
         snapshotRegistry.idempotentCreateSnapshot(0);
-        assertTrue(group.isInStates(Collections.singleton("empty"), 0));
-        assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
-
-        group.updateMember(new StreamsGroupMember.Builder("member1")
-            .build());
-        snapshotRegistry.idempotentCreateSnapshot((1));
-        assertTrue(group.isInStates(Collections.singleton("empty"), 0));
-        assertTrue(group.isInStates(Collections.singleton("stable"), 1));
-        assertFalse(group.isInStates(Collections.singleton("empty"), 1));
+        assertTrue(group.isInStates(Collections.singleton("initializing"), 0));
+        assertFalse(group.isInStates(Collections.singleton("Initializing"), 
0));
+
+        group.setTopology(new StreamsTopology("topology-id", 
Collections.emptyMap()));
+        snapshotRegistry.idempotentCreateSnapshot(1);
+        assertTrue(group.isInStates(Collections.singleton("initializing"), 0));
+        assertTrue(group.isInStates(Collections.singleton("empty"), 1));
+        assertFalse(group.isInStates(Collections.singleton("initializing"), 
1));
     }
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index c6b135eeebf..230e63b9058 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -40,8 +40,8 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
 import static 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
-import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentEpochRecord;
-import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.streams.CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTaskAssignment;
@@ -299,7 +299,7 @@ public class TargetAssignmentBuilderTest {
         );
 
         
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
-        
assertEquals(Collections.singletonList(newStreamsTargetAssignmentEpochRecord(
+        
assertEquals(Collections.singletonList(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         )), result.records());
@@ -338,7 +338,7 @@ public class TargetAssignmentBuilderTest {
 
         
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
 
-        
assertEquals(Collections.singletonList(newStreamsTargetAssignmentEpochRecord(
+        
assertEquals(Collections.singletonList(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         )), result.records());
@@ -391,17 +391,17 @@ public class TargetAssignmentBuilderTest {
         assertEquals(3, result.records().size());
 
         assertUnorderedListEquals(Arrays.asList(
-            newStreamsTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 4, 5, 6),
                 mkTaskAssignment(barSubtopologyId, 4, 5, 6)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 1, 2, 3),
                 mkTaskAssignment(barSubtopologyId, 1, 2, 3)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 2));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(2));
@@ -461,21 +461,21 @@ public class TargetAssignmentBuilderTest {
         assertEquals(4, result.records().size());
 
         assertUnorderedListEquals(Arrays.asList(
-            newStreamsTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 1, 2),
                 mkTaskAssignment(barSubtopologyId, 1, 2)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 3, 4),
                 mkTaskAssignment(barSubtopologyId, 3, 4)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 5, 6),
                 mkTaskAssignment(barSubtopologyId, 5, 6)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 3));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(3));
@@ -547,21 +547,21 @@ public class TargetAssignmentBuilderTest {
         assertEquals(4, result.records().size());
 
         assertUnorderedListEquals(Arrays.asList(
-            newStreamsTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 1, 2),
                 mkTaskAssignment(barSubtopologyId, 1, 2)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 3, 4),
                 mkTaskAssignment(barSubtopologyId, 3, 4)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 5, 6),
                 mkTaskAssignment(barSubtopologyId, 5, 6)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 3));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(3));
@@ -629,17 +629,17 @@ public class TargetAssignmentBuilderTest {
 
         // Member 1 has no record because its assignment did not change.
         assertUnorderedListEquals(Arrays.asList(
-            newStreamsTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 3, 4, 5),
                 mkTaskAssignment(barSubtopologyId, 3, 4, 5)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 6),
                 mkTaskAssignment(barSubtopologyId, 6)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 2));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(2));
@@ -703,17 +703,17 @@ public class TargetAssignmentBuilderTest {
         assertEquals(3, result.records().size());
 
         assertUnorderedListEquals(Arrays.asList(
-            newStreamsTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 1, 2, 3),
                 mkTaskAssignment(barSubtopologyId, 1, 2, 3)
             ), Collections.emptyMap(), Collections.emptyMap()),
-            newStreamsTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 4, 5, 6),
                 mkTaskAssignment(barSubtopologyId, 4, 5, 6)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 2));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(2));
@@ -783,13 +783,13 @@ public class TargetAssignmentBuilderTest {
         assertEquals(2, result.records().size());
 
         assertUnorderedListEquals(Collections.singletonList(
-            newStreamsTargetAssignmentRecord("my-group", "member-3-a", 
mkAssignment(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a", 
mkAssignment(
                 mkTaskAssignment(fooSubtopologyId, 5, 6),
                 mkTaskAssignment(barSubtopologyId, 5, 6)
             ), Collections.emptyMap(), Collections.emptyMap())
         ), result.records().subList(0, 1));
 
-        assertEquals(newStreamsTargetAssignmentEpochRecord(
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
             "my-group",
             20
         ), result.records().get(1));

Reply via email to