This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c70b7c4b9e6 KAFKA-18323: Add StreamsGroup class (#18729)
c70b7c4b9e6 is described below

commit c70b7c4b9e6fbd805d9a229b4ac13c1e10016d76
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 12 11:01:53 2025 +0100

    KAFKA-18323: Add StreamsGroup class (#18729)
    
    Implements a memory model for representing streams groups in the group 
coordinator, as well as group count and rebalance metrics.
    
    Reviewers: Bill Bejeck <[email protected]>, Bruno Cadonna 
<[email protected]>
---
 checkstyle/suppressions.xml                        |    2 +-
 .../org/apache/kafka/coordinator/group/Group.java  |    1 +
 .../group/metrics/GroupCoordinatorMetrics.java     |  128 ++-
 .../metrics/GroupCoordinatorMetricsShard.java      |  142 +++
 .../streams/StreamsCoordinatorRecordHelpers.java   |   12 -
 .../coordinator/group/streams/StreamsGroup.java    | 1012 ++++++++++++++++++
 .../coordinator/group/streams/TopicMetadata.java   |   23 +-
 .../StreamsGroupPartitionMetadataValue.json        |    9 +-
 .../group/metrics/GroupCoordinatorMetricsTest.java |   60 +-
 .../StreamsCoordinatorRecordHelpersTest.java       |   14 +-
 .../group/streams/StreamsGroupTest.java            | 1099 ++++++++++++++++++++
 .../group/streams/TargetAssignmentBuilderTest.java |   35 +-
 .../group/streams/TopicMetadataTest.java           |   45 +-
 .../streams/topics/InternalTopicManagerTest.java   |    8 +-
 14 files changed, 2472 insertions(+), 118 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4b5fad5d6e1..8d753be0e99 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -335,7 +335,7 @@
     <suppress checks="ParameterNumber"
               
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorConfig).java"/>
     <suppress checks="ClassDataAbstractionCouplingCheck"
-              
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde).java"/>
+              
files="(RecordHelpersTest|GroupCoordinatorRecordHelpers|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest|GroupCoordinatorRecordSerde|StreamsGroupTest).java"/>
     <suppress checks="JavaNCSS"
               files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index b5d63499751..79c1b72237b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -37,6 +37,7 @@ public interface Group {
         CONSUMER("consumer"),
         CLASSIC("classic"),
         SHARE("share"),
+        STREAMS("streams"),
         UNKNOWN("unknown");
 
         private final String name;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index 23606593a6b..1cd9e4b0bd9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
@@ -73,6 +74,8 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
     public static final String SHARE_GROUP_COUNT_METRIC_NAME = "group-count";
     public static final String CONSUMER_GROUP_COUNT_STATE_TAG = "state";
     public static final String SHARE_GROUP_COUNT_STATE_TAG = 
CONSUMER_GROUP_COUNT_STATE_TAG;
+    public static final String STREAMS_GROUP_COUNT_METRIC_NAME = 
"streams-group-count";
+    public static final String STREAMS_GROUP_COUNT_STATE_TAG = "state";
 
     public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits";
     public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired";
@@ -80,6 +83,7 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
     public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME 
= "CompletedRebalances";
     public static final String CONSUMER_GROUP_REBALANCES_SENSOR_NAME = 
"ConsumerGroupRebalances";
     public static final String SHARE_GROUP_REBALANCES_SENSOR_NAME = 
"ShareGroupRebalances";
+    public static final String STREAMS_GROUP_REBALANCES_SENSOR_NAME = 
"StreamsGroupRebalances";
 
     private final MetricName classicGroupCountMetricName;
     private final MetricName consumerGroupCountMetricName;
@@ -92,6 +96,13 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
     private final MetricName shareGroupCountEmptyMetricName;
     private final MetricName shareGroupCountStableMetricName;
     private final MetricName shareGroupCountDeadMetricName;
+    private final MetricName streamsGroupCountMetricName;
+    private final MetricName streamsGroupCountEmptyMetricName;
+    private final MetricName streamsGroupCountAssigningMetricName;
+    private final MetricName streamsGroupCountReconcilingMetricName;
+    private final MetricName streamsGroupCountStableMetricName;
+    private final MetricName streamsGroupCountDeadMetricName;
+    private final MetricName streamsGroupCountNotReadyMetricName;
 
     private final MetricsRegistry registry;
     private final Metrics metrics;
@@ -106,6 +117,7 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
         this(KafkaYammerMetrics.defaultRegistry(), new Metrics());
     }
 
+    @SuppressWarnings("MethodLength")
     public GroupCoordinatorMetrics(MetricsRegistry registry, Metrics metrics) {
         this.registry = Objects.requireNonNull(registry);
         this.metrics = Objects.requireNonNull(metrics);
@@ -190,6 +202,55 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             SHARE_GROUP_COUNT_STATE_TAG, 
ShareGroup.ShareGroupState.DEAD.toString()
         );
 
+        streamsGroupCountMetricName = metrics.metricName(
+            GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The total number of groups using the streams rebalance protocol.",
+            Collections.singletonMap(GROUP_COUNT_PROTOCOL_TAG, 
Group.GroupType.STREAMS.toString())
+        );
+
+        streamsGroupCountEmptyMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in empty state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.EMPTY.toString())
+        );
+
+        streamsGroupCountAssigningMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in assigning state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.ASSIGNING.toString())
+        );
+
+        streamsGroupCountReconcilingMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in reconciling state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.RECONCILING.toString())
+        );
+
+        streamsGroupCountStableMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in stable state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.STABLE.toString())
+        );
+
+        streamsGroupCountDeadMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in dead state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.DEAD.toString())
+        );
+
+        streamsGroupCountNotReadyMetricName = metrics.metricName(
+            STREAMS_GROUP_COUNT_METRIC_NAME,
+            METRICS_GROUP,
+            "The number of streams groups in not ready state.",
+            Collections.singletonMap(STREAMS_GROUP_COUNT_STATE_TAG, 
StreamsGroupState.NOT_READY.toString())
+        );
+
         registerGauges();
 
         Sensor offsetCommitsSensor = 
metrics.sensor(OFFSET_COMMITS_SENSOR_NAME);
@@ -247,6 +308,15 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
                 METRICS_GROUP,
                 "The total number of share group rebalances",
                 SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
+        
+        Sensor streamsGroupRebalanceSensor = 
metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
+        streamsGroupRebalanceSensor.add(new Meter(
+            metrics.metricName("streams-group-rebalance-rate",
+                METRICS_GROUP,
+                "The rate of streams group rebalances"),
+            metrics.metricName("streams-group-rebalance-count",
+                METRICS_GROUP,
+                "The total number of streams group rebalances")));
 
         globalSensors = Collections.unmodifiableMap(Utils.mkMap(
             Utils.mkEntry(OFFSET_COMMITS_SENSOR_NAME, offsetCommitsSensor),
@@ -254,7 +324,8 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             Utils.mkEntry(OFFSET_DELETIONS_SENSOR_NAME, offsetDeletionsSensor),
             Utils.mkEntry(CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME, 
classicGroupCompletedRebalancesSensor),
             Utils.mkEntry(CONSUMER_GROUP_REBALANCES_SENSOR_NAME, 
consumerGroupRebalanceSensor),
-            Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, 
shareGroupRebalanceSensor)
+            Utils.mkEntry(SHARE_GROUP_REBALANCES_SENSOR_NAME, 
shareGroupRebalanceSensor),
+            Utils.mkEntry(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 
streamsGroupRebalanceSensor)
         ));
     }
 
@@ -278,6 +349,14 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
         return shards.values().stream().mapToLong(shard -> 
shard.numConsumerGroups(state)).sum();
     }
 
+    private long numStreamsGroups() {
+        return 
shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numStreamsGroups).sum();
+    }
+
+    private long numStreamsGroups(StreamsGroupState state) {
+        return shards.values().stream().mapToLong(shard -> 
shard.numStreamsGroups(state)).sum();
+    }
+    
     private long numShareGroups() {
         return 
shards.values().stream().mapToLong(GroupCoordinatorMetricsShard::numShareGroups).sum();
     }
@@ -309,7 +388,14 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             shareGroupCountMetricName,
             shareGroupCountEmptyMetricName,
             shareGroupCountStableMetricName,
-            shareGroupCountDeadMetricName
+            shareGroupCountDeadMetricName,
+            streamsGroupCountMetricName,
+            streamsGroupCountEmptyMetricName,
+            streamsGroupCountAssigningMetricName,
+            streamsGroupCountReconcilingMetricName,
+            streamsGroupCountStableMetricName,
+            streamsGroupCountDeadMetricName,
+            streamsGroupCountNotReadyMetricName
         ).forEach(metrics::removeMetric);
 
         Arrays.asList(
@@ -318,7 +404,8 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             OFFSET_DELETIONS_SENSOR_NAME,
             CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME,
             CONSUMER_GROUP_REBALANCES_SENSOR_NAME,
-            SHARE_GROUP_REBALANCES_SENSOR_NAME
+            SHARE_GROUP_REBALANCES_SENSOR_NAME,
+            STREAMS_GROUP_REBALANCES_SENSOR_NAME
         ).forEach(metrics::removeSensor);
     }
 
@@ -461,5 +548,40 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             shareGroupCountDeadMetricName,
             (Gauge<Long>) (config, now) -> 
numShareGroups(ShareGroup.ShareGroupState.DEAD)
         );
+
+        metrics.addMetric(
+            streamsGroupCountMetricName,
+            (Gauge<Long>) (config, now) -> numStreamsGroups()
+        );
+
+        metrics.addMetric(
+            streamsGroupCountEmptyMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.EMPTY)
+        );
+
+        metrics.addMetric(
+            streamsGroupCountAssigningMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.ASSIGNING)
+        );
+
+        metrics.addMetric(
+            streamsGroupCountReconcilingMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.RECONCILING)
+        );
+
+        metrics.addMetric(
+            streamsGroupCountStableMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.STABLE)
+        );
+
+        metrics.addMetric(
+            streamsGroupCountDeadMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.DEAD)
+        );
+
+        metrics.addMetric(
+            streamsGroupCountNotReadyMetricName,
+            (Gauge<Long>) (config, now) -> 
numStreamsGroups(StreamsGroupState.NOT_READY)
+        );
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
index 1ed75229f58..8b814bb0b23 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineLong;
 
@@ -78,6 +79,11 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
      */
     private final Map<ShareGroup.ShareGroupState, TimelineGaugeCounter> 
shareGroupGauges;
 
+    /**
+     * Streams group size gauge counters keyed by the metric name.
+     */
+    private final Map<StreamsGroupState, TimelineGaugeCounter> 
streamsGroupGauges;
+
     /**
      * All sensors keyed by the sensor name. A Sensor object is shared across 
all metrics shards.
      */
@@ -119,6 +125,21 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
                 new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0)))
         );
 
+        this.streamsGroupGauges = Utils.mkMap(
+            Utils.mkEntry(StreamsGroupState.EMPTY,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0))),
+            Utils.mkEntry(StreamsGroupState.ASSIGNING,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0))),
+            Utils.mkEntry(StreamsGroupState.RECONCILING,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0))),
+            Utils.mkEntry(StreamsGroupState.STABLE,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0))),
+            Utils.mkEntry(StreamsGroupState.DEAD,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0))),
+            Utils.mkEntry(StreamsGroupState.NOT_READY,
+                new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), 
new AtomicLong(0)))
+        );
+
         this.globalSensors = Objects.requireNonNull(globalSensors);
         this.topicPartition = Objects.requireNonNull(topicPartition);
     }
@@ -144,6 +165,20 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         this.consumerGroupGauges = consumerGroupGauges;
     }
 
+    /**
+     * Increment the number of streams groups.
+     *
+     * @param state the streams group state.
+     */
+    public void incrementNumStreamsGroups(StreamsGroupState state) {
+        TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+        if (gaugeCounter != null) {
+            synchronized (gaugeCounter.timelineLong) {
+                gaugeCounter.timelineLong.increment();
+            }
+        }
+    }
+
     /**
      * Decrement the number of offsets.
      */
@@ -153,6 +188,20 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         }
     }
 
+    /**
+     * Decrement the number of streams groups.
+     *
+     * @param state the streams group state.
+     */
+    public void decrementNumStreamsGroups(StreamsGroupState state) {
+        TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+        if (gaugeCounter != null) {
+            synchronized (gaugeCounter.timelineLong) {
+                gaugeCounter.timelineLong.decrement();
+            }
+        }
+    }
+
     /**
      * @return The number of offsets.
      */
@@ -205,6 +254,29 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
         return consumerGroupGauges.values().stream()
             .mapToLong(Long::longValue).sum();
     }
+    
+    /**
+     * Obtain the number of streams groups in the specified state.
+     *
+     * @param state  the streams group state.
+     *
+     * @return   The number of streams groups in `state`.
+     */
+    public long numStreamsGroups(StreamsGroupState state) {
+        TimelineGaugeCounter gaugeCounter = streamsGroupGauges.get(state);
+        if (gaugeCounter != null) {
+            return gaugeCounter.atomicLong.get();
+        }
+        return 0L;
+    }
+
+    /**
+     * @return The total number of streams groups.
+     */
+    public long numStreamsGroups() {
+        return streamsGroupGauges.values().stream()
+            .mapToLong(timelineGaugeCounter -> 
timelineGaugeCounter.atomicLong.get()).sum();
+    }
 
     @Override
     public void record(String sensorName) {
@@ -246,6 +318,14 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
             }
             gaugeCounter.atomicLong.set(value);
         });
+
+        this.streamsGroupGauges.forEach((__, gaugeCounter) -> {
+            long value;
+            synchronized (gaugeCounter.timelineLong) {
+                value = gaugeCounter.timelineLong.get(offset);
+            }
+            gaugeCounter.atomicLong.set(value);
+        });
     }
 
     /**
@@ -330,4 +410,66 @@ public class GroupCoordinatorMetricsShard implements 
CoordinatorMetricsShard {
             }
         }
     }
+    
+    /**
+     * Called when a streams group's state has changed. Increment/decrement
+     * the counter accordingly.
+     *
+     * @param oldState The previous state. null value means that it's a new 
group.
+     * @param newState The next state. null value means that the group has 
been removed.
+     */
+    public void onStreamsGroupStateTransition(
+        StreamsGroupState oldState,
+        StreamsGroupState newState
+    ) {
+        if (newState != null) {
+            switch (newState) {
+                case EMPTY:
+                    incrementNumStreamsGroups(StreamsGroupState.EMPTY);
+                    break;
+                case NOT_READY:
+                    incrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+                    break;
+                case ASSIGNING:
+                    incrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+                    break;
+                case RECONCILING:
+                    incrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+                    break;
+                case STABLE:
+                    incrementNumStreamsGroups(StreamsGroupState.STABLE);
+                    break;
+                case DEAD:
+                    incrementNumStreamsGroups(StreamsGroupState.DEAD);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown new state for 
streams group: " + newState);
+            }
+        }
+
+        if (oldState != null) {
+            switch (oldState) {
+                case EMPTY:
+                    decrementNumStreamsGroups(StreamsGroupState.EMPTY);
+                    break;
+                case NOT_READY:
+                    decrementNumStreamsGroups(StreamsGroupState.NOT_READY);
+                    break;
+                case ASSIGNING:
+                    decrementNumStreamsGroups(StreamsGroupState.ASSIGNING);
+                    break;
+                case RECONCILING:
+                    decrementNumStreamsGroups(StreamsGroupState.RECONCILING);
+                    break;
+                case STABLE:
+                    decrementNumStreamsGroups(StreamsGroupState.STABLE);
+                    break;
+                case DEAD:
+                    decrementNumStreamsGroups(StreamsGroupState.DEAD);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown old state for 
streams group: " + newState);
+            }
+        }
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index c44e5d89713..881f930d8a5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -26,7 +26,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
-import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue.PartitionMetadata;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
@@ -116,21 +115,10 @@ public class StreamsCoordinatorRecordHelpers {
 
         StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
         newPartitionMetadata.forEach((topicName, topicMetadata) -> {
-            List<StreamsGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
-            if (!topicMetadata.partitionRacks().isEmpty()) {
-                topicMetadata.partitionRacks().forEach((partition, racks) ->
-                    partitionMetadata.add(new 
StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                        .setPartition(partition)
-                        .setRacks(racks.stream().sorted().toList())
-                    )
-                );
-            }
-            
partitionMetadata.sort(Comparator.comparingInt(PartitionMetadata::partition));
             value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
                 .setTopicId(topicMetadata.id())
                 .setTopicName(topicMetadata.name())
                 .setNumPartitions(topicMetadata.numPartitions())
-                .setPartitionMetadata(partitionMetadata)
             );
         });
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
new file mode 100644
index 00000000000..d161e64f599
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -0,0 +1,1012 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.NOT_READY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by records in 
the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    /**
+     * The protocol type for streams groups. There is only one protocol type, 
"streams".
+     */
+    private static final String PROTOCOL_TYPE = "streams";
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        NOT_READY("NotReady"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            if (Objects.equals(name, "NotReady")) {
+                this.lowerCaseName = "not_ready";
+            } else {
+                this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    private final LogContext logContext;
+    private final Logger log;
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group ID.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the topology, topic 
metadata or the set of members changes and it will trigger
+     * the computation of a new assignment for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The metadata associated with each subscribed topic name.
+     */
+    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch means that a new assignment is required. The assignment
+     * epoch is updated when a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member ID.
+     */
+    private final TimelineHashMap<String, TasksTuple> targetAssignment;
+
+    /**
+     * These maps map each active/standby/warmup task to the process ID(s) of 
their current owner.
+     * The mapping is of the form <code>subtopology -> partition -> 
memberId</code>.
+     * When a member revokes a partition, it removes its process ID from this 
map.
+     * When a member gets a partition, it adds its process ID to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentActiveTaskToProcessId;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentStandbyTaskToProcessIds;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, 
Set<String>>> currentWarmupTaskToProcessIds;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The Streams topology.
+     */
+    private final TimelineObject<Optional<StreamsTopology>> topology;
+
+    /**
+     * The configured topology including resolved regular expressions.
+     */
+    private final TimelineObject<Optional<ConfiguredTopology>> 
configuredTopology;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with the group epoch at the time of setting it.
+     * The metadata refresh time is considered as a soft state (read that it 
is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the metadata refresh deadline is invalidated if the
+     * group epoch does not correspond to the current group epoch. This can 
happen if the metadata refresh deadline is updated after having
+     * refreshed the metadata but the write operation failed. In this case, 
the time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        LogContext logContext,
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.log = logContext.logger(StreamsGroup.class);
+        this.logContext = logContext;
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTaskToProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentStandbyTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.metrics = Objects.requireNonNull(metrics);
+        this.topology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+        this.configuredTopology = new TimelineObject<>(snapshotRegistry, 
Optional.empty());
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    public Optional<ConfiguredTopology> configuredTopology() {
+        return configuredTopology.get();
+    }
+
+    public Optional<StreamsTopology> topology() {
+        return topology.get();
+    }
+
+    public void setTopology(StreamsTopology topology) {
+        this.topology.set(Optional.of(topology));
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The group ID.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member ID of a static member that matches the given group instance 
ID.
+     *
+     * @param groupInstanceId The group instance ID.
+     * @return The member ID corresponding to the given instance ID or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member is done via the
+     * {@link StreamsGroup#updateMember(StreamsGroupMember)} method.
+     *
+     * @param memberId          The member ID.
+     * @param createIfNotExists Booleans indicating whether the member must be 
created if it does not exist.
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) {
+            return member;
+        }
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance ID.
+     * @return The member corresponding to the given instance ID or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskProcessId(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Updates the member ID stored against the instance ID if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null && 
newMember.instanceId().isPresent()) {
+            staticMembers.put(newMember.instanceId().get(), 
newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member ID to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskProcessId(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null && 
oldMember.instanceId().isPresent()) {
+            staticMembers.remove(oldMember.instanceId().get());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member ID.
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable map containing all the members keyed by their ID.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable map containing all the static members keyed by 
instance ID.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not 
exist.
+     */
+    public TasksTuple targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, TasksTuple.EMPTY);
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId            The member ID.
+     * @param newTargetAssignment The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, TasksTuple 
newTargetAssignment) {
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+    /**
+     * @return An immutable map containing all the target assignment keyed by 
member ID.
+     */
+    public Map<String, TasksTuple> targetAssignment() {
+        return Collections.unmodifiableMap(targetAssignment);
+    }
+
+    /**
+     * Returns the current process ID of a task or null if the task does not 
have one.
+     *
+     * @param subtopologyId  The topic ID.
+     * @param taskId         The task ID.
+     * @return The process ID or null.
+     */
+    public String currentActiveTaskProcessId(
+        String subtopologyId, int taskId
+    ) {
+        Map<Integer, String> tasks = 
currentActiveTaskToProcessId.get(subtopologyId);
+        if (tasks == null) {
+            return null;
+        } else {
+            return tasks.getOrDefault(taskId, null);
+        }
+    }
+
+    /**
+     * Returns the current process IDs of a task or empty set if the task does 
not have one.
+     *
+     * @param subtopologyId The topic ID.
+     * @param taskId        The task ID.
+     * @return The process IDs or empty set.
+     */
+    public Set<String> currentStandbyTaskProcessIds(
+        String subtopologyId, int taskId
+    ) {
+        Map<Integer, Set<String>> tasks = 
currentStandbyTaskToProcessIds.get(subtopologyId);
+        if (tasks == null) {
+            return Collections.emptySet();
+        } else {
+            return tasks.getOrDefault(taskId, Collections.emptySet());
+        }
+    }
+
+    /**
+     * Returns the current process ID of a task or empty set if the task does 
not have one.
+     *
+     * @param subtopologyId The topic ID.
+     * @param taskId        The process ID.
+     * @return The member IDs or empty set.
+     */
+    public Set<String> currentWarmupTaskProcessIds(
+        String subtopologyId, int taskId
+    ) {
+        Map<Integer, Set<String>> tasks = 
currentWarmupTaskToProcessIds.get(subtopologyId);
+        if (tasks == null) {
+            return Collections.emptySet();
+        } else {
+            return tasks.getOrDefault(taskId, Collections.emptySet());
+        }
+    }
+
+    /**
+     * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+     */
+    public Map<String, TopicMetadata> partitionMetadata() {
+        return Collections.unmodifiableMap(partitionMetadata);
+    }
+
+    /**
+     * Updates the partition metadata. This replaces the previous one.
+     *
+     * @param partitionMetadata The new partition metadata.
+     */
+    public void setPartitionMetadata(
+        Map<String, TopicMetadata> partitionMetadata
+    ) {
+        this.partitionMetadata.clear();
+        this.partitionMetadata.putAll(partitionMetadata);
+        maybeUpdateConfiguredTopology();
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Computes the partition metadata based on the current topology and the 
current topics image.
+     *
+     * @param topicsImage The current metadata for all available topics.
+     * @param topology    The current metadata for the Streams topology
+     * @return An immutable map of partition metadata for each topic that the 
Streams topology is using (besides non-repartition sink topics)
+     */
+    public Map<String, TopicMetadata> computePartitionMetadata(
+        TopicsImage topicsImage,
+        StreamsTopology topology
+    ) {
+        Set<String> requiredTopicNames = topology.requiredTopics();
+
+        // Create the topic metadata for each subscribed topic.
+        Map<String, TopicMetadata> newPartitionMetadata = new 
HashMap<>(requiredTopicNames.size());
+
+        requiredTopicNames.forEach(topicName -> {
+            TopicImage topicImage = topicsImage.getTopic(topicName);
+            if (topicImage != null) {
+                newPartitionMetadata.put(topicName, new TopicMetadata(
+                    topicImage.id(),
+                    topicImage.name(),
+                    topicImage.partitions().size())
+                );
+            }
+        });
+
+        return Collections.unmodifiableMap(newPartitionMetadata);
+    }
+
+    /**
+     * Updates the metadata refresh deadline.
+     *
+     * @param deadlineMs The deadline in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setMetadataRefreshDeadline(
+        long deadlineMs,
+        int groupEpoch
+    ) {
+        this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, 
groupEpoch);
+    }
+
+    /**
+     * Requests a metadata refresh.
+     */
+    public void requestMetadataRefresh() {
+        this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two 
cases: 1) The deadline is smaller or equal to the current
+     * time; 2) The group epoch associated with the deadline is larger than 
the current group epoch. This means that the operations which
+     * updated the deadline failed.
+     *
+     * @param currentTimeMs The current time in milliseconds.
+     * @return A boolean indicating whether a refresh is required or not.
+     */
+    public boolean hasMetadataExpired(long currentTimeMs) {
+        return currentTimeMs >= metadataRefreshDeadline.deadlineMs || 
groupEpoch() < metadataRefreshDeadline.epoch;
+    }
+
+    /**
+     * @return The metadata refresh deadline.
+     */
+    public DeadlineAndEpoch metadataRefreshDeadline() {
+        return metadataRefreshDeadline;
+    }
+
+    /**
+     * Validates the OffsetCommit request.
+     *
+     * @param memberId          The member ID.
+     * @param groupInstanceId   The group instance ID.
+     * @param memberEpoch       The member epoch.
+     * @param isTransactional   Whether the offset commit is transactional or 
not.
+     * @param apiVersion        The api version.
+     * @throws UnknownMemberIdException  If the member is not found.
+     * @throws StaleMemberEpochException If the provided member epoch doesn't 
match the actual member epoch.
+     */
+    @Override
+    public void validateOffsetCommit(
+        String memberId,
+        String groupInstanceId,
+        int memberEpoch,
+        boolean isTransactional,
+        short apiVersion
+    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        // When the member epoch is -1, the request comes from either the 
admin client
+        // or a consumer which does not use the group management facility. In 
this case,
+        // the request can commit offsets if the group is empty.
+        if (memberEpoch < 0 && members().isEmpty()) return;
+
+        // The TxnOffsetCommit API does not require the member ID, the 
generation ID and the group instance ID fields.
+        // Hence, they are only validated if any of them is provided
+        if (isTransactional && memberEpoch == 
JoinGroupRequest.UNKNOWN_GENERATION_ID &&
+            memberId.equals(JoinGroupRequest.UNKNOWN_MEMBER_ID) && 
groupInstanceId == null)
+            return;
+
+        final StreamsGroupMember member = getOrMaybeCreateMember(memberId, 
false);
+
+        // If the commit is not transactional and the member uses the new 
streams protocol (KIP-1071),
+        // the member should be using the OffsetCommit API version >= 9.
+        if (!isTransactional && apiVersion < 9) {
+            throw new UnsupportedVersionException("OffsetCommit version 9 or 
above must be used " +
+                "by members using the streams group protocol");
+        }
+
+        validateMemberEpoch(memberEpoch, member.memberEpoch());
+    }
+
+    /**
+     * Validates the OffsetFetch request.
+     *
+     * @param memberId            The member ID for streams groups.
+     * @param memberEpoch         The member epoch for streams groups.
+     * @param lastCommittedOffset The last committed offsets in the timeline.
+     */
+    @Override
+    public void validateOffsetFetch(
+        String memberId,
+        int memberEpoch,
+        long lastCommittedOffset
+    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        // When the member ID is null and the member epoch is -1, the request 
either comes
+        // from the admin client or from a client which does not provide them. 
In this case,
+        // the fetch request is accepted.
+        if (memberId == null && memberEpoch < 0) {
+            return;
+        }
+
+        final StreamsGroupMember member = members.get(memberId, 
lastCommittedOffset);
+        if (member == null) {
+            throw new UnknownMemberIdException(String.format("Member %s is not 
a member of group %s.",
+                memberId, groupId));
+        }
+        validateMemberEpoch(memberEpoch, member.memberEpoch());
+    }
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() {
+    }
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != StreamsGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    @Override
+    public boolean isSubscribedToTopic(String topic) {
+        Optional<ConfiguredTopology> maybeConfiguredTopology = 
configuredTopology.get();
+        if (maybeConfiguredTopology.isEmpty() || 
!maybeConfiguredTopology.get().isReady()) {
+            return false;
+        }
+        for (ConfiguredSubtopology sub : 
maybeConfiguredTopology.get().subtopologies().orElse(new TreeMap<>()).values()) 
{
+            if (sub.sourceTopics().contains(topic) || 
sub.repartitionSourceTopics().containsKey(topic)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
+        members().forEach((memberId, member) ->
+            
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId(),
 memberId))
+        );
+
+        members().forEach((memberId, member) ->
+            
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId(),
 memberId))
+        );
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
+        members().forEach((memberId, member) ->
+            
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId(),
 memberId))
+        );
+
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataTombstoneRecord(groupId()));
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(groupId()));
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(groupId()));
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return state() == StreamsGroupState.EMPTY;
+    }
+
+    /**
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty if no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata 
-> offsetAndMetadata.commitTimestampMs));
+    }
+
+    @Override
+    public boolean isInStates(Set<String> statesFilter, long committedOffset) {
+        return 
statesFilter.contains(state.get(committedOffset).toLowerCaseString());
+    }
+
+    /**
+     * Throws a StaleMemberEpochException if the received member epoch does 
not match the expected member epoch.
+     */
+    private void validateMemberEpoch(
+        int receivedMemberEpoch,
+        int expectedMemberEpoch
+    ) throws StaleMemberEpochException {
+        if (receivedMemberEpoch != expectedMemberEpoch) {
+            throw new StaleMemberEpochException(String.format("The received 
member epoch %d does not match "
+                + "the expected member epoch %d.", receivedMemberEpoch, 
expectedMemberEpoch));
+        }
+    }
+
+    /**
+     * Updates the current state of the group.
+     */
+    private void maybeUpdateGroupState() {
+        StreamsGroupState previousState = state.get();
+        StreamsGroupState newState = STABLE;
+        if (members.isEmpty()) {
+            newState = EMPTY;
+        } else if (topology() == null || configuredTopology().isEmpty() || 
!configuredTopology().get().isReady()) {
+            newState = NOT_READY;
+        } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+            newState = ASSIGNING;
+        } else {
+            for (StreamsGroupMember member : members.values()) {
+                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+                    newState = RECONCILING;
+                    break;
+                }
+            }
+        }
+
+        state.set(newState);
+        metrics.onStreamsGroupStateTransition(previousState, newState);
+    }
+
+    private void maybeUpdateConfiguredTopology() {
+        if (topology.get().isPresent()) {
+            final StreamsTopology streamsTopology = topology.get().get();
+
+            log.info("[GroupId {}] Configuring the topology {}", groupId, 
streamsTopology);
+            
this.configuredTopology.set(Optional.of(InternalTopicManager.configureTopics(logContext,
 streamsTopology, partitionMetadata)));
+
+        } else {
+            configuredTopology.set(Optional.empty());
+        }
+    }
+
+    /**
+     * Updates the tasks process IDs based on the old and the new member.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateTaskProcessId(
+        StreamsGroupMember oldMember,
+        StreamsGroupMember newMember
+    ) {
+        maybeRemoveTaskProcessId(oldMember);
+        addTaskProcessId(
+            newMember.assignedTasks(),
+            newMember.processId()
+        );
+        addTaskProcessId(
+            newMember.tasksPendingRevocation(),
+            newMember.processId()
+        );
+    }
+
+    /**
+     * Removes the task process IDs for the provided member.
+     *
+     * @param oldMember The old member.
+     */
+    private void maybeRemoveTaskProcessId(
+        StreamsGroupMember oldMember
+    ) {
+        if (oldMember != null) {
+            removeTaskProcessIds(oldMember.assignedTasks(), 
oldMember.processId());
+            removeTaskProcessIds(oldMember.tasksPendingRevocation(), 
oldMember.processId());
+        }
+    }
+
+    void removeTaskProcessIds(
+        TasksTuple tasks,
+        String processId
+    ) {
+        if (tasks != null) {
+            removeTaskProcessIds(tasks.activeTasks(), 
currentActiveTaskToProcessId, processId);
+            removeTaskProcessIdsFromSet(tasks.standbyTasks(), 
currentStandbyTaskToProcessIds, processId);
+            removeTaskProcessIdsFromSet(tasks.warmupTasks(), 
currentWarmupTaskToProcessIds, processId);
+        }
+    }
+
+    /**
+     * Removes the task process IDs based on the provided assignment.
+     *
+     * @param assignment    The assignment.
+     * @param expectedProcessId The expected process ID.
+     * @throws IllegalStateException if the process ID does not match the 
expected one. package-private for testing.
+     */
+    private void removeTaskProcessIds(
+        Map<String, Set<Integer>> assignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentTasksProcessId,
+        String expectedProcessId
+    ) {
+        assignment.forEach((subtopologyId, assignedPartitions) -> {
+            currentTasksProcessId.compute(subtopologyId, (__, 
partitionsOrNull) -> {
+                if (partitionsOrNull != null) {
+                    assignedPartitions.forEach(partitionId -> {
+                        String prevValue = 
partitionsOrNull.remove(partitionId);
+                        if (!Objects.equals(prevValue, expectedProcessId)) {
+                            throw new IllegalStateException(
+                                String.format("Cannot remove the process ID %s 
from task %s_%s because the partition is " +
+                                    "still owned at a different process ID 
%s", expectedProcessId, subtopologyId, partitionId, prevValue));
+                        }
+                    });
+                    if (partitionsOrNull.isEmpty()) {
+                        return null;
+                    } else {
+                        return partitionsOrNull;
+                    }
+                } else {
+                    throw new IllegalStateException(
+                        String.format("Cannot remove the process ID %s from %s 
because it does not have any processId",
+                            expectedProcessId, subtopologyId));
+                }
+            });
+        });
+    }
+
+    /**
+     * Removes the task process IDs based on the provided assignment.
+     *
+     * @param assignment    The assignment.
+     * @param processIdToRemove The expected process ID.
+     * @throws IllegalStateException if the process ID does not match the 
expected one. package-private for testing.
+     */
+    private void removeTaskProcessIdsFromSet(
+        Map<String, Set<Integer>> assignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> 
currentTasksProcessId,
+        String processIdToRemove
+    ) {
+        assignment.forEach((subtopologyId, assignedPartitions) -> {
+            currentTasksProcessId.compute(subtopologyId, (__, 
partitionsOrNull) -> {
+                if (partitionsOrNull != null) {
+                    assignedPartitions.forEach(partitionId -> {
+                        if 
(!partitionsOrNull.get(partitionId).remove(processIdToRemove)) {
+                            throw new IllegalStateException(
+                                String.format("Cannot remove the process ID %s 
from task %s_%s because the task is " +
+                                    "not owned by this process ID", 
processIdToRemove, subtopologyId, partitionId));
+                        }
+                    });
+                    if (partitionsOrNull.isEmpty()) {
+                        return null;
+                    } else {
+                        return partitionsOrNull;
+                    }
+                } else {
+                    throw new IllegalStateException(
+                        String.format("Cannot remove the process ID %s from %s 
because it does not have any process ID",
+                            processIdToRemove, subtopologyId));
+                }
+            });
+        });
+    }
+
+    /**
+     * Adds the partitions epoch based on the provided assignment.
+     *
+     * @param tasks     The assigned tasks.
+     * @param processId The process ID.
+     * @throws IllegalStateException if the partition already has an epoch 
assigned. package-private for testing.
+     */
+    void addTaskProcessId(
+        TasksTuple tasks,
+        String processId
+    ) {
+        if (tasks != null) {
+            addTaskProcessId(tasks.activeTasks(), processId, 
currentActiveTaskToProcessId);
+            addTaskProcessIdToSet(tasks.standbyTasks(), processId, 
currentStandbyTaskToProcessIds);
+            addTaskProcessIdToSet(tasks.warmupTasks(), processId, 
currentWarmupTaskToProcessIds);
+        }
+    }
+
+    private void addTaskProcessId(
+        Map<String, Set<Integer>> tasks,
+        String processId,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
currentTaskProcessId
+    ) {
+        tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+            currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) 
-> {
+                if (partitionsOrNull == null) {
+                    partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitions.size());
+                }
+                for (Integer partitionId : assignedTaskPartitions) {
+                    String prevValue = partitionsOrNull.put(partitionId, 
processId);
+                    if (prevValue != null) {
+                        throw new IllegalStateException(
+                            String.format("Cannot set the process ID of %s-%s 
to %s because the partition is " +
+                                "still owned by process ID %s", subtopologyId, 
partitionId, processId, prevValue));
+                    }
+                }
+                return partitionsOrNull;
+            });
+        });
+    }
+
+    private void addTaskProcessIdToSet(
+        Map<String, Set<Integer>> tasks,
+        String processId,
+        TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> 
currentTaskProcessId
+    ) {
+        tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+            currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) 
-> {
+                if (partitionsOrNull == null) {
+                    partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitions.size());
+                }
+                for (Integer partitionId : assignedTaskPartitions) {
+                    partitionsOrNull.computeIfAbsent(partitionId, ___ -> new 
HashSet<>()).add(processId);
+                }
+                return partitionsOrNull;
+            });
+        });
+    }
+
+    public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(
+        long committedOffset
+    ) {
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setGroupEpoch(groupEpoch.get(committedOffset))
+            .setGroupState(state.get(committedOffset).toString())
+            .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset))
+            
.setTopology(configuredTopology.get(committedOffset).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(null));
+        members.entrySet(committedOffset).forEach(
+            entry -> describedGroup.members().add(
+                entry.getValue().asStreamsGroupDescribeMember(
+                    targetAssignment.get(entry.getValue().memberId(), 
committedOffset)
+                )
+            )
+        );
+        return describedGroup;
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
index 19a988373ef..f4fa3dc7aa7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopicMetadata.java
@@ -19,10 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
 import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataValue;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * Immutable topic metadata, representing the current state of a topic in the 
broker.
@@ -30,15 +27,12 @@ import java.util.Set;
  * @param id             The topic ID.
  * @param name           The topic name.
  * @param numPartitions  The number of partitions.
- * @param partitionRacks Map of every partition ID to a set of its rack IDs, 
if they exist. If rack information is unavailable for all
- *                       partitions, this is an empty map.
  */
-public record TopicMetadata(Uuid id, String name, int numPartitions, 
Map<Integer, Set<String>> partitionRacks) {
+public record TopicMetadata(Uuid id, String name, int numPartitions) {
 
     public TopicMetadata(Uuid id,
                          String name,
-                         int numPartitions,
-                         Map<Integer, Set<String>> partitionRacks) {
+                         int numPartitions) {
         this.id = Objects.requireNonNull(id);
         if (Uuid.ZERO_UUID.equals(id)) {
             throw new IllegalArgumentException("Topic id cannot be 
ZERO_UUID.");
@@ -51,23 +45,12 @@ public record TopicMetadata(Uuid id, String name, int 
numPartitions, Map<Integer
         if (numPartitions <= 0) {
             throw new IllegalArgumentException("Number of partitions must be 
positive.");
         }
-        this.partitionRacks = Objects.requireNonNull(partitionRacks);
     }
 
     public static TopicMetadata 
fromRecord(StreamsGroupPartitionMetadataValue.TopicMetadata record) {
-        // Converting the data type from a list stored in the record to a map 
for the topic metadata.
-        Map<Integer, Set<String>> partitionRacks = new HashMap<>();
-        for (StreamsGroupPartitionMetadataValue.PartitionMetadata 
partitionMetadata : record.partitionMetadata()) {
-            partitionRacks.put(
-                partitionMetadata.partition(),
-                Set.copyOf(partitionMetadata.racks())
-            );
-        }
-
         return new TopicMetadata(
             record.topicId(),
             record.topicName(),
-            record.numPartitions(),
-            partitionRacks);
+            record.numPartitions());
     }
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
index 1f5eb8e8dcb..f9be55b9e42 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json
@@ -28,14 +28,7 @@
       { "name": "TopicName", "versions": "0+", "type": "string",
         "about": "The topic name." },
       { "name": "NumPartitions", "versions": "0+", "type": "int32",
-        "about": "The number of partitions of the topic." },
-      { "name": "PartitionMetadata", "versions": "0+", "type": 
"[]PartitionMetadata",
-        "about": "Partitions mapped to a set of racks. If the rack information 
is unavailable for all the partitions, an empty list is stored.", "fields": [
-          { "name": "Partition", "versions": "0+", "type": "int32",
-            "about": "The partition number." },
-          { "name": "Racks", "versions": "0+", "type": "[]string",
-            "about": "The set of racks that the partition is mapped to." }
-      ]}
+        "about": "The number of partitions of the topic." }
     ]}
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 05da88f9f7a..e20640fb129 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
+import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
 import org.apache.kafka.timeline.SnapshotRegistry;
 
 import com.yammer.metrics.core.MetricsRegistry;
@@ -47,6 +48,7 @@ import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_EXPIRED_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
+import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertGaugeValue;
 import static 
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.assertMetricsForTypeEqual;
 import static 
org.apache.kafka.coordinator.group.metrics.MetricsTestUtils.metricName;
@@ -129,7 +131,37 @@ public class GroupCoordinatorMetricsTest {
                 GroupCoordinatorMetrics.METRICS_GROUP,
                 "The number of share groups in dead state.",
                 "protocol", Group.GroupType.SHARE.toString(),
-                "state", GroupState.DEAD.toString())
+                "state", GroupState.DEAD.toString()),
+            metrics.metricName(
+                "group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("protocol", 
Group.GroupType.STREAMS.toString())),
+            metrics.metricName("streams-group-rebalance-rate", 
GroupCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName("streams-group-rebalance-count", 
GroupCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.EMPTY.toString())),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.ASSIGNING.toString())),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.RECONCILING.toString())),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.STABLE.toString())),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.DEAD.toString())),
+            metrics.metricName(
+                "streams-group-count",
+                GroupCoordinatorMetrics.METRICS_GROUP,
+                Collections.singletonMap("state", 
StreamsGroupState.NOT_READY.toString()))
         ));
 
         try {
@@ -145,7 +177,7 @@ public class GroupCoordinatorMetricsTest {
                 ));
 
                 assertMetricsForTypeEqual(registry, "kafka.coordinator.group", 
expectedRegistry);
-                expectedMetrics.forEach(metricName -> 
assertTrue(metrics.metrics().containsKey(metricName)));
+                expectedMetrics.forEach(metricName -> 
assertTrue(metrics.metrics().containsKey(metricName), metricName + " is 
missing"));
             }
             assertMetricsForTypeEqual(registry, "kafka.coordinator.group", 
Collections.emptySet());
             expectedMetrics.forEach(metricName -> 
assertFalse(metrics.metrics().containsKey(metricName)));
@@ -195,6 +227,10 @@ public class GroupCoordinatorMetricsTest {
         IntStream.range(0, 5).forEach(__ -> 
shard0.incrementNumShareGroups(ShareGroup.ShareGroupState.STABLE));
         IntStream.range(0, 5).forEach(__ -> 
shard1.incrementNumShareGroups(ShareGroup.ShareGroupState.EMPTY));
         IntStream.range(0, 3).forEach(__ -> 
shard1.decrementNumShareGroups(ShareGroup.ShareGroupState.DEAD));
+        
+        IntStream.range(0, 5).forEach(__ -> 
shard0.incrementNumStreamsGroups(StreamsGroupState.STABLE));
+        IntStream.range(0, 5).forEach(__ -> 
shard1.incrementNumStreamsGroups(StreamsGroupState.EMPTY));
+        IntStream.range(0, 3).forEach(__ -> 
shard1.decrementNumStreamsGroups(StreamsGroupState.DEAD));
 
         assertEquals(4, shard0.numClassicGroups());
         assertEquals(5, shard1.numClassicGroups());
@@ -228,6 +264,14 @@ public class GroupCoordinatorMetricsTest {
             metrics.metricName("group-count", METRICS_GROUP, 
Collections.singletonMap("protocol", "share")),
             7
         );
+        
+        assertEquals(5, shard0.numStreamsGroups());
+        assertEquals(2, shard1.numStreamsGroups());
+        assertGaugeValue(
+            metrics,
+            metrics.metricName("group-count", METRICS_GROUP, 
Collections.singletonMap("protocol", "streams")),
+            7
+        );
     }
 
     @Test
@@ -269,6 +313,18 @@ public class GroupCoordinatorMetricsTest {
             "The total number of share group rebalances",
             "protocol", "share"
         ), 50);
+
+        shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);
+        assertMetricValue(metrics, metrics.metricName(
+            "streams-group-rebalance-rate",
+            GroupCoordinatorMetrics.METRICS_GROUP,
+            "The rate of streams group rebalances"
+        ), 5.0 / 3.0);
+        assertMetricValue(metrics, metrics.metricName(
+            "streams-group-rebalance-count",
+            GroupCoordinatorMetrics.METRICS_GROUP,
+            "The total number of streams group rebalances"
+        ), 50);
     }
 
     private void assertMetricValue(Metrics metrics, MetricName metricName, 
double val) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 489d4fa0254..ab55dd3239b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -257,8 +257,8 @@ class StreamsCoordinatorRecordHelpersTest {
         Uuid uuid1 = Uuid.randomUuid();
         Uuid uuid2 = Uuid.randomUuid();
         Map<String, TopicMetadata> newPartitionMetadata = Map.of(
-            TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1, Map.of(0, 
Set.of(RACK_1, RACK_2))),
-            TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2, Map.of(1, 
Set.of(RACK_3)))
+            TOPIC_1, new TopicMetadata(uuid1, TOPIC_1, 1),
+            TOPIC_2, new TopicMetadata(uuid2, TOPIC_2, 2)
         );
 
         StreamsGroupPartitionMetadataValue value = new 
StreamsGroupPartitionMetadataValue();
@@ -266,21 +266,11 @@ class StreamsCoordinatorRecordHelpersTest {
             .setTopicId(uuid1)
             .setTopicName(TOPIC_1)
             .setNumPartitions(1)
-            .setPartitionMetadata(List.of(
-                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(0)
-                    .setRacks(List.of(RACK_1, RACK_2))
-            ))
         );
         value.topics().add(new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
             .setTopicId(uuid2)
             .setTopicName(TOPIC_2)
             .setNumPartitions(2)
-            .setPartitionMetadata(List.of(
-                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(1)
-                    .setRacks(List.of(RACK_3))
-            ))
         );
 
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
new file mode 100644
index 00000000000..47dcf552fa6
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -0,0 +1,1099 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class StreamsGroupTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+
+    private StreamsGroup createStreamsGroup(String groupId) {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        return new StreamsGroup(
+            LOG_CONTEXT,
+            snapshotRegistry,
+            groupId,
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+    }
+
+    @Test
+    public void testGetOrCreateMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        // Create a member.
+        member = streamsGroup.getOrMaybeCreateMember("member-id", true);
+        assertEquals("member-id", member.memberId());
+
+        // Add member to the group.
+        streamsGroup.updateMember(member);
+
+        // Get that member back.
+        member = streamsGroup.getOrMaybeCreateMember("member-id", false);
+        assertEquals("member-id", member.memberId());
+
+        assertThrows(UnknownMemberIdException.class, () ->
+            streamsGroup.getOrMaybeCreateMember("does-not-exist", false));
+    }
+
+    @Test
+    public void testUpdateMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+        member = new StreamsGroupMember.Builder(member).build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", 
false));
+    }
+
+    @Test
+    public void testNoStaticMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        // Create a new member which is not static
+        streamsGroup.getOrMaybeCreateMember("member", true);
+        assertNull(streamsGroup.staticMember("instance-id"));
+    }
+
+    @Test
+    public void testGetStaticMemberByInstanceId() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+        member = new StreamsGroupMember.Builder(member)
+            .setInstanceId("instance")
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(member, streamsGroup.staticMember("instance"));
+        assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", 
false));
+        assertEquals(member.memberId(), 
streamsGroup.staticMemberId("instance"));
+    }
+
+    @Test
+    public void testRemoveMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember member = 
streamsGroup.getOrMaybeCreateMember("member", true);
+        streamsGroup.updateMember(member);
+        assertTrue(streamsGroup.hasMember("member"));
+
+        streamsGroup.removeMember("member");
+        assertFalse(streamsGroup.hasMember("member"));
+
+    }
+
+    @Test
+    public void testRemoveStaticMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setInstanceId("instance")
+            .build();
+
+        streamsGroup.updateMember(member);
+        assertTrue(streamsGroup.hasMember("member"));
+
+        streamsGroup.removeMember("member");
+        assertFalse(streamsGroup.hasMember("member"));
+        assertNull(streamsGroup.staticMember("instance"));
+        assertNull(streamsGroup.staticMemberId("instance"));
+    }
+
+    @Test
+    public void testUpdatingMemberUpdatesProcessId() {
+        String fooSubtopology = "foo-sub";
+        String barSubtopology = "bar-sub";
+        String zarSubtopology = "zar-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setProcessId("process")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+                )
+            )
+            .setTasksPendingRevocation(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+                )
+            )
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+        assertEquals(Collections.singleton("process"),
+            streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+        assertEquals(Collections.singleton("process"),
+            streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+        assertEquals(Collections.singleton("process"),
+            streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+        assertEquals(Collections.singleton("process"),
+            streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+        assertEquals(Collections.emptySet(),
+            streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+        assertEquals(Collections.emptySet(),
+            streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+        member = new StreamsGroupMember.Builder(member)
+            .setProcessId("process1")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+                )
+            )
+            .setTasksPendingRevocation(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+                )
+            )
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals("process1", 
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+        assertEquals(Collections.singleton("process1"),
+            streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+        assertEquals(Collections.singleton("process1"),
+            streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+        assertEquals("process1", 
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+        assertEquals(Collections.singleton("process1"),
+            streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+        assertEquals(Collections.singleton("process1"),
+            streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+        assertEquals(Collections.emptySet(),
+            streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+        assertEquals(Collections.emptySet(),
+            streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+    }
+
+    @Test
+    public void 
testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsReassignedBeforeBeingRevoked()
 {
+        String fooSubtopologyId = "foo-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setProcessId("process")
+            .setAssignedTasks(
+                new TasksTuple(
+                    emptyMap(),
+                    emptyMap(),
+                    emptyMap()
+                )
+            )
+            .setTasksPendingRevocation(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+                )
+            )
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+
+        member = new StreamsGroupMember.Builder(member)
+            .setProcessId("process1")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+                )
+            )
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals("process1", 
streamsGroup.currentActiveTaskProcessId(fooSubtopologyId, 1));
+    }
+
+    @Test
+    public void 
testUpdatingMemberUpdatesTaskProcessIdWhenPartitionIsNotReleased() {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+            .setProcessId("process")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                    emptyMap(),
+                    emptyMap()
+                )
+            )
+            .build();
+
+        streamsGroup.updateMember(m1);
+
+        StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
+            .setProcessId("process")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                    emptyMap(),
+                    emptyMap()
+                )
+            )
+            .build();
+
+        // m2 should not be able to acquire foo-1 because the partition is
+        // still owned by another member.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.updateMember(m2));
+    }
+
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testRemoveTaskProcessIds(TaskRole taskRole) {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        // Removing should fail because there is no epoch set.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
+            mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+            "process"
+        ));
+
+        StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+            .setProcessId("process")
+            .setAssignedTasks(mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 
1)))
+            .build();
+
+        streamsGroup.updateMember(m1);
+
+        // Removing should fail because the expected epoch is incorrect.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeTaskProcessIds(
+            mkTasksTuple(taskRole, mkTasks(fooSubtopologyId, 1)),
+            "process1"
+        ));
+    }
+
+    @Test
+    public void testAddTaskProcessIds() {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        streamsGroup.addTaskProcessId(
+            new TasksTuple(
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+            ),
+            "process"
+        );
+
+        // Changing the epoch should fail because the owner of the partition
+        // should remove it first.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.addTaskProcessId(
+            new TasksTuple(
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 1)),
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 2)),
+                mkTasksPerSubtopology(mkTasks(fooSubtopologyId, 3))
+            ),
+            "process"
+        ));
+    }
+
+    @Test
+    public void testDeletingMemberRemovesProcessId() {
+        String fooSubtopology = "foo-sub";
+        String barSubtopology = "bar-sub";
+        String zarSubtopology = "zar-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setProcessId("process")
+            .setAssignedTasks(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 1)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 2)),
+                    mkTasksPerSubtopology(mkTasks(fooSubtopology, 3))
+                )
+            )
+            .setTasksPendingRevocation(
+                new TasksTuple(
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 4)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 5)),
+                    mkTasksPerSubtopology(mkTasks(barSubtopology, 6))
+                )
+            )
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(fooSubtopology, 1));
+        assertEquals(Collections.singleton("process"), 
streamsGroup.currentStandbyTaskProcessIds(fooSubtopology, 2));
+        assertEquals(Collections.singleton("process"), 
streamsGroup.currentWarmupTaskProcessIds(fooSubtopology, 3));
+        assertEquals("process", 
streamsGroup.currentActiveTaskProcessId(barSubtopology, 4));
+        assertEquals(Collections.singleton("process"), 
streamsGroup.currentStandbyTaskProcessIds(barSubtopology, 5));
+        assertEquals(Collections.singleton("process"), 
streamsGroup.currentWarmupTaskProcessIds(barSubtopology, 6));
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+
+        streamsGroup.removeMember(member.memberId());
+
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 1));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 2));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 3));
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 3));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 4));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 5));
+        assertNull(streamsGroup.currentActiveTaskProcessId(zarSubtopology, 7));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentStandbyTaskProcessIds(zarSubtopology, 8));
+        assertEquals(Collections.emptySet(), 
streamsGroup.currentWarmupTaskProcessIds(zarSubtopology, 9));
+    }
+
+    @Test
+    public void testGroupState() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .build();
+
+        streamsGroup.updateMember(member1);
+        streamsGroup.setGroupEpoch(1);
+
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, 
streamsGroup.state());
+
+        streamsGroup.setTopology(new StreamsTopology(1, 
Collections.emptyMap()));
+
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+
+        StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .build();
+
+        streamsGroup.updateMember(member2);
+        streamsGroup.setGroupEpoch(2);
+
+        assertEquals(MemberState.STABLE, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+
+        streamsGroup.setTargetAssignmentEpoch(2);
+
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        member1 = new StreamsGroupMember.Builder(member1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member1);
+
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        // Member 2 is not stable so the group stays in reconciling state.
+        member2 = new StreamsGroupMember.Builder(member2)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member2);
+
+        assertEquals(MemberState.UNREVOKED_TASKS, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        member2 = new StreamsGroupMember.Builder(member2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member2);
+
+        assertEquals(MemberState.STABLE, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
+
+        streamsGroup.removeMember("member1");
+        streamsGroup.removeMember("member2");
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+    }
+
+    @Test
+    public void testMetadataRefreshDeadline() {
+        MockTime time = new MockTime();
+        StreamsGroup group = createStreamsGroup("group-foo");
+
+        // Group epoch starts at 0.
+        assertEquals(0, group.groupEpoch());
+
+        // The refresh time deadline should be empty when the group is created 
or loaded.
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline. The metadata remains valid because the 
deadline
+        // has not past and the group epoch is correct.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());
+        assertFalse(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch(), 
group.metadataRefreshDeadline().epoch);
+
+        // Advance past the deadline. The metadata should have expired.
+        time.sleep(1001L);
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+        // Set the refresh time deadline with a higher group epoch. The 
metadata is considered
+        // as expired because the group epoch attached to the deadline is 
higher than the
+        // current group epoch.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch() + 1);
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+        // Advance the group epoch.
+        group.setGroupEpoch(group.groupEpoch() + 1);
+
+        // Set the refresh deadline. The metadata remains valid because the 
deadline
+        // has not past and the group epoch is correct.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());
+        assertFalse(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch(), 
group.metadataRefreshDeadline().epoch);
+
+        // Request metadata refresh. The metadata expires immediately.
+        group.requestMetadataRefresh();
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(0, group.metadataRefreshDeadline().epoch);
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+    public void testValidateTransactionalOffsetCommit(short version) {
+        boolean isTransactional = true;
+        StreamsGroup group = createStreamsGroup("group-foo");
+
+
+        // Simulate a call from the admin client without member ID and member 
epoch.
+        // This should pass only if the group is empty.
+        group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+        // The member does not exist.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("member-id", null, 0, isTransactional, 
version));
+
+        // Create a member.
+        group.updateMember(new 
StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+        // A call from the admin client should fail as the group is not empty.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("", "", -1, isTransactional, version));
+
+        // The member epoch is stale.
+        assertThrows(StaleMemberEpochException.class, () ->
+            group.validateOffsetCommit("member-id", "", 10, isTransactional, 
version));
+
+        // This should succeed.
+        group.validateOffsetCommit("member-id", "", 0, isTransactional, 
version);
+
+        // This should succeed.
+        group.validateOffsetCommit("", null, -1, isTransactional, version);
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
+    public void testValidateOffsetCommit(short version) {
+        boolean isTransactional = false;
+        StreamsGroup group = createStreamsGroup("group-foo");
+
+        // Simulate a call from the admin client without member ID and member 
epoch.
+        // This should pass only if the group is empty.
+        group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+        // The member does not exist.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("member-id", null, 0, isTransactional, 
version));
+
+        // Create members.
+        group.updateMember(
+            new StreamsGroupMember
+                .Builder("new-protocol-member-id").setMemberEpoch(0).build()
+        );
+
+        // A call from the admin client should fail as the group is not empty.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("", "", -1, isTransactional, version));
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("", null, -1, isTransactional, 
version));
+
+        // The member epoch is stale.
+        if (version >= 9) {
+            assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
+        }
+
+        // This should succeed.
+        if (version >= 9) {
+            group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version);
+        } else {
+            assertThrows(UnsupportedVersionException.class, () ->
+                group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version));
+        }
+    }
+
+    @Test
+    public void testAsListedGroup() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        StreamsGroup group = new StreamsGroup(
+            LOG_CONTEXT,
+            snapshotRegistry,
+            "group-foo",
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        group.setGroupEpoch(1);
+        group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+        group.setTargetAssignmentEpoch(1);
+        group.updateMember(new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .build());
+        snapshotRegistry.idempotentCreateSnapshot(1);
+
+        ListGroupsResponseData.ListedGroup listedGroup = 
group.asListedGroup(1);
+
+        assertEquals("group-foo", listedGroup.groupId());
+        assertEquals("streams", listedGroup.protocolType());
+        assertEquals("Reconciling", listedGroup.groupState());
+        assertEquals("streams", listedGroup.groupType());
+    }
+
+    @Test
+    public void testValidateOffsetFetch() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        StreamsGroup group = new StreamsGroup(
+            LOG_CONTEXT,
+            snapshotRegistry,
+            "group-foo",
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+
+        // Simulate a call from the admin client without member ID and member 
epoch.
+        group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
+
+        // The member does not exist.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+        // Create a member.
+        snapshotRegistry.idempotentCreateSnapshot(0);
+        group.updateMember(new 
StreamsGroupMember.Builder("member-id").setMemberEpoch(0).build());
+
+        // The member does not exist at last committed offset 0.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetFetch("member-id", 0, 0));
+
+        // The member exists but the epoch is stale when the last committed 
offset is not considered.
+        assertThrows(StaleMemberEpochException.class, () ->
+            group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
+
+        // This should succeed.
+        group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
+    }
+
+    @Test
+    public void testValidateDeleteGroup() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+        assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .build();
+        streamsGroup.updateMember(member1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.setTopology(new StreamsTopology(1, 
Collections.emptyMap()));
+
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.setGroupEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.setTargetAssignmentEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.removeMember("member1");
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+    }
+
+    @Test
+    public void testOffsetExpirationCondition() {
+        long currentTimestamp = 30000L;
+        long commitTimestamp = 20000L;
+        long offsetsRetentionMs = 10000L;
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new 
SnapshotRegistry(LOG_CONTEXT), "group-id", 
mock(GroupCoordinatorMetricsShard.class));
+
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        OffsetExpirationConditionImpl condition = 
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+        assertEquals(commitTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testAsDescribedGroup() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, 
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
+        snapshotRegistry.idempotentCreateSnapshot(0);
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY.toString(), 
group.stateAsString(0));
+
+        group.setGroupEpoch(1);
+        group.setTopology(new StreamsTopology(1, Collections.emptyMap()));
+        group.setTargetAssignmentEpoch(1);
+        group.updateMember(new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setInstanceId("instance1")
+            .setRackId("rack1")
+            .setClientId("client1")
+            .setClientHost("host1")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(1)
+            .setProcessId("process1")
+            .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host1").setPort(9092))
+            .setClientTags(Collections.singletonMap("tag1", "value1"))
+            .setAssignedTasks(new TasksTuple(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap()))
+            .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap()))
+            .build());
+        group.updateMember(new StreamsGroupMember.Builder("member2")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setInstanceId("instance2")
+            .setRackId("rack2")
+            .setClientId("client2")
+            .setClientHost("host2")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(1)
+            .setProcessId("process2")
+            .setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host2").setPort(9092))
+            .setClientTags(Collections.singletonMap("tag2", "value2"))
+            .setAssignedTasks(new TasksTuple(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap()))
+            .setTasksPendingRevocation(new TasksTuple(Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptyMap()))
+            .build());
+        snapshotRegistry.idempotentCreateSnapshot(1);
+
+        StreamsGroupDescribeResponseData.DescribedGroup expected = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("group-id-1")
+            .setGroupState(StreamsGroup.StreamsGroupState.STABLE.toString())
+            .setGroupEpoch(1)
+            .setTopology(new 
StreamsGroupDescribeResponseData.Topology().setEpoch(1).setSubtopologies(Collections.emptyList()))
+            .setAssignmentEpoch(1)
+            .setMembers(Arrays.asList(
+                new StreamsGroupDescribeResponseData.Member()
+                    .setMemberId("member1")
+                    .setMemberEpoch(1)
+                    .setInstanceId("instance1")
+                    .setRackId("rack1")
+                    .setClientId("client1")
+                    .setClientHost("host1")
+                    .setTopologyEpoch(1)
+                    .setProcessId("process1")
+                    .setUserEndpoint(new 
StreamsGroupDescribeResponseData.Endpoint().setHost("host1").setPort(9092))
+                    .setClientTags(Collections.singletonList(new 
StreamsGroupDescribeResponseData.KeyValue().setKey("tag1").setValue("value1")))
+                    .setAssignment(new 
StreamsGroupDescribeResponseData.Assignment())
+                    .setTargetAssignment(new 
StreamsGroupDescribeResponseData.Assignment()),
+                new StreamsGroupDescribeResponseData.Member()
+                    .setMemberId("member2")
+                    .setMemberEpoch(1)
+                    .setInstanceId("instance2")
+                    .setRackId("rack2")
+                    .setClientId("client2")
+                    .setClientHost("host2")
+                    .setTopologyEpoch(1)
+                    .setProcessId("process2")
+                    .setUserEndpoint(new 
StreamsGroupDescribeResponseData.Endpoint().setHost("host2").setPort(9092))
+                    .setClientTags(Collections.singletonList(new 
StreamsGroupDescribeResponseData.KeyValue().setKey("tag2").setValue("value2")))
+                    .setAssignment(new 
StreamsGroupDescribeResponseData.Assignment())
+                    .setTargetAssignment(new 
StreamsGroupDescribeResponseData.Assignment())
+            ));
+        StreamsGroupDescribeResponseData.DescribedGroup actual = 
group.asDescribedGroup(1);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testStateTransitionMetrics() {
+        // Confirm metrics is not updated when a new StreamsGroup is created 
but only when the group transitions
+        // its state.
+        GroupCoordinatorMetricsShard metrics = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(
+            LOG_CONTEXT,
+            new SnapshotRegistry(new LogContext()),
+            "group-id",
+            metrics
+        );
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        verify(metrics, times(0)).onStreamsGroupStateTransition(null, 
StreamsGroup.StreamsGroupState.EMPTY);
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, 
streamsGroup.state());
+        verify(metrics, 
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.EMPTY, 
StreamsGroup.StreamsGroupState.NOT_READY);
+
+        streamsGroup.setTopology(new StreamsTopology(1, 
Collections.emptyMap()));
+
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+        verify(metrics, 
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.NOT_READY,
 StreamsGroup.StreamsGroupState.RECONCILING);
+
+        streamsGroup.setGroupEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+        verify(metrics, 
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.RECONCILING,
 StreamsGroup.StreamsGroupState.ASSIGNING);
+
+        streamsGroup.setTargetAssignmentEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
+        verify(metrics, 
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.ASSIGNING,
 StreamsGroup.StreamsGroupState.STABLE);
+
+        streamsGroup.removeMember("member");
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        verify(metrics, 
times(1)).onStreamsGroupStateTransition(StreamsGroup.StreamsGroupState.STABLE, 
StreamsGroup.StreamsGroupState.EMPTY);
+    }
+
+    @Test
+    public void testIsInStatesCaseInsensitiveAndUnderscored() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        GroupCoordinatorMetricsShard metricsShard = new 
GroupCoordinatorMetricsShard(
+            snapshotRegistry,
+            emptyMap(),
+            new TopicPartition("__consumer_offsets", 0)
+        );
+        StreamsGroup group = new StreamsGroup(LOG_CONTEXT, snapshotRegistry, 
"group-foo", metricsShard);
+        snapshotRegistry.idempotentCreateSnapshot(0);
+        assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+        assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
+
+        group.updateMember(new StreamsGroupMember.Builder("member1")
+            .build());
+        snapshotRegistry.idempotentCreateSnapshot(1);
+        assertTrue(group.isInStates(Collections.singleton("empty"), 0));
+        assertTrue(group.isInStates(Collections.singleton("not_ready"), 1));
+        assertFalse(group.isInStates(Collections.singleton("empty"), 1));
+    }
+
+    @Test
+    public void testSetTopologyUpdatesStateAndConfiguredTopology() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, 
snapshotRegistry, "test-group", metricsShard);
+
+        StreamsTopology topology = new StreamsTopology(1, 
Collections.emptyMap());
+
+        ConfiguredTopology topo = mock(ConfiguredTopology.class);
+        when(topo.isReady()).thenReturn(true);
+
+        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
+            mocked.when(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(Map.of()))).thenReturn(topo);
+            streamsGroup.setTopology(topology);
+            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(Map.of())));
+        }
+
+        Optional<ConfiguredTopology> configuredTopology = 
streamsGroup.configuredTopology();
+        assertTrue(configuredTopology.isPresent(), "Configured topology should 
be present");
+        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+        streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .build());
+
+        assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+    }
+
+    @Test
+    public void testSetPartitionMetadataUpdatesStateAndConfiguredTopology() {
+        Uuid topicUuid = Uuid.randomUuid();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(LOG_CONTEXT, 
snapshotRegistry, "test-group", metricsShard);
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+
+        Map<String, TopicMetadata> partitionMetadata = new HashMap<>();
+        partitionMetadata.put("topic1", new TopicMetadata(topicUuid, "topic1", 
1));
+
+        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
+            streamsGroup.setPartitionMetadata(partitionMetadata);
+            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
any(), any()), never());
+        }
+
+        assertTrue(streamsGroup.configuredTopology().isEmpty(), "Configured 
topology should not be present");
+        assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+
+        StreamsTopology topology = new StreamsTopology(1, 
Collections.emptyMap());
+        streamsGroup.setTopology(topology);
+        ConfiguredTopology topo = mock(ConfiguredTopology.class);
+        when(topo.isReady()).thenReturn(true);
+
+        try (MockedStatic<InternalTopicManager> mocked = 
mockStatic(InternalTopicManager.class)) {
+            mocked.when(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata))).thenReturn(topo);
+            streamsGroup.setPartitionMetadata(partitionMetadata);
+            mocked.verify(() -> InternalTopicManager.configureTopics(any(), 
eq(topology), eq(partitionMetadata)));
+        }
+
+        Optional<ConfiguredTopology> configuredTopology = 
streamsGroup.configuredTopology();
+        assertTrue(configuredTopology.isPresent(), "Configured topology should 
be present");
+        assertEquals(topo, configuredTopology.get());
+        assertEquals(partitionMetadata, streamsGroup.partitionMetadata());
+        assertEquals(StreamsGroupState.EMPTY, streamsGroup.state());
+
+        streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .build());
+
+        assertEquals(StreamsGroupState.RECONCILING, streamsGroup.state());
+    }
+
+    @Test
+    public void testComputePartitionMetadata() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        StreamsGroup streamsGroup = new StreamsGroup(
+            LOG_CONTEXT,
+            snapshotRegistry,
+            "group-foo",
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        TopicsImage topicsImage = mock(TopicsImage.class);
+        TopicImage topicImage = mock(TopicImage.class);
+        when(topicImage.id()).thenReturn(Uuid.randomUuid());
+        when(topicImage.name()).thenReturn("topic1");
+        when(topicImage.partitions()).thenReturn(Collections.singletonMap(0, 
null));
+        when(topicsImage.getTopic("topic1")).thenReturn(topicImage);
+        StreamsTopology topology = mock(StreamsTopology.class);
+        
when(topology.requiredTopics()).thenReturn(Collections.singleton("topic1"));
+
+        Map<String, TopicMetadata> partitionMetadata = 
streamsGroup.computePartitionMetadata(topicsImage, topology);
+
+        assertEquals(1, partitionMetadata.size());
+        assertTrue(partitionMetadata.containsKey("topic1"));
+        TopicMetadata topicMetadata = partitionMetadata.get("topic1");
+        assertNotNull(topicMetadata);
+        assertEquals(topicImage.id(), topicMetadata.id());
+        assertEquals("topic1", topicMetadata.name());
+        assertEquals(1, topicMetadata.numPartitions());
+    }
+
+    @Test
+    void testCreateGroupTombstoneRecords() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
+        StreamsGroup streamsGroup = new StreamsGroup(
+            LOG_CONTEXT,
+            snapshotRegistry,
+            "test-group",
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        streamsGroup.updateMember(new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .build());
+        List<CoordinatorRecord> records = new ArrayList<>();
+
+        streamsGroup.createGroupTombstoneRecords(records);
+
+        assertEquals(7, records.size());
+        for (CoordinatorRecord record : records) {
+            assertNotNull(record.key());
+            assertNull(record.value());
+        }
+        final Set<ApiMessage> keys = 
records.stream().map(CoordinatorRecord::key).collect(Collectors.toSet());
+        assertTrue(keys.contains(new 
StreamsGroupMetadataKey().setGroupId("test-group")));
+        assertTrue(keys.contains(new 
StreamsGroupTargetAssignmentMetadataKey().setGroupId("test-group")));
+        assertTrue(keys.contains(new 
StreamsGroupPartitionMetadataKey().setGroupId("test-group")));
+        assertTrue(keys.contains(new 
StreamsGroupTopologyKey().setGroupId("test-group")));
+        assertTrue(keys.contains(new 
StreamsGroupMemberMetadataKey().setGroupId("test-group").setMemberId("member1")));
+        assertTrue(keys.contains(new 
StreamsGroupTargetAssignmentMemberKey().setGroupId("test-group").setMemberId("member1")));
+        assertTrue(keys.contains(new 
StreamsGroupCurrentMemberAssignmentKey().setGroupId("test-group").setMemberId("member1")));
+    }
+
+    @Test
+    public void testIsSubscribedToTopic() {
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+        assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+        streamsGroup.setTopology(
+            new StreamsTopology(1,
+                Map.of("test-subtopology",
+                    new StreamsGroupTopologyValue.Subtopology()
+                        .setSubtopologyId("test-subtopology")
+                        .setSourceTopics(List.of("test-topic1"))
+                        .setRepartitionSourceTopics(List.of(new 
StreamsGroupTopologyValue.TopicInfo().setName("test-topic2")))
+                        .setRepartitionSinkTopics(List.of("test-topic2"))
+                )
+            )
+        );
+
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic1"));
+        assertFalse(streamsGroup.isSubscribedToTopic("test-topic2"));
+        assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+
+        streamsGroup.setPartitionMetadata(
+            Map.of(
+                "test-topic1", new TopicMetadata(Uuid.randomUuid(), 
"test-topic1", 1),
+                "test-topic2", new TopicMetadata(Uuid.randomUuid(), 
"test-topic2", 1)
+            )
+        );
+
+        assertTrue(streamsGroup.isSubscribedToTopic("test-topic1"));
+        assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
+        assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
index 114974558b8..0380d4cf5e7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -45,7 +45,6 @@ import java.util.TreeMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
@@ -144,8 +143,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -196,8 +195,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -261,8 +260,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -341,8 +340,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2, 3),
@@ -429,8 +428,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2),
@@ -509,8 +508,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2),
@@ -581,8 +580,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
-        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6);
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6);
 
         context.addGroupMember("member-1", "instance-member-1", 
mkTasksTuple(taskRole,
             mkTasks(fooSubtopologyId, 1, 2),
@@ -709,16 +708,14 @@ public class TargetAssignmentBuilderTest {
 
         public String addSubtopologyWithSingleSourceTopic(
             String topicName,
-            int numTasks,
-            Map<Integer, Set<String>> partitionRacks
+            int numTasks
         ) {
             String subtopologyId = Uuid.randomUuid().toString();
             Uuid topicId = Uuid.randomUuid();
             subscriptionMetadata.put(topicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(
                 topicId,
                 topicName,
-                numTasks,
-                partitionRacks
+                numTasks
             ));
             topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numTasks);
             subtopologies.put(subtopologyId, new 
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), 
Map.of()));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
index 38d63dab6ef..59712d5c954 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopicMetadataTest.java
@@ -21,11 +21,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupPartitionMetadat
 
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -35,84 +30,60 @@ public class TopicMetadataTest {
     @Test
     public void testConstructor() {
         assertDoesNotThrow(() ->
-            new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, new 
HashMap<>()));
+            new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3));
     }
 
     @Test
     public void testConstructorWithZeroUuid() {
         Exception exception = assertThrows(IllegalArgumentException.class, () 
->
-            new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3, new 
HashMap<>()));
+            new TopicMetadata(Uuid.ZERO_UUID, "valid-topic", 3));
         assertEquals("Topic id cannot be ZERO_UUID.", exception.getMessage());
     }
 
     @Test
     public void testConstructorWithNullUuid() {
         assertThrows(NullPointerException.class, () ->
-            new TopicMetadata(null, "valid-topic", 3, new HashMap<>()));
+            new TopicMetadata(null, "valid-topic", 3));
     }
 
     @Test
     public void testConstructorWithNullName() {
         assertThrows(NullPointerException.class, () ->
-            new TopicMetadata(Uuid.randomUuid(), null, 3, new HashMap<>()));
+            new TopicMetadata(Uuid.randomUuid(), null, 3));
     }
 
     @Test
     public void testConstructorWithEmptyName() {
         Exception exception = assertThrows(IllegalArgumentException.class, () 
->
-            new TopicMetadata(Uuid.randomUuid(), "", 3, new HashMap<>()));
+            new TopicMetadata(Uuid.randomUuid(), "", 3));
         assertEquals("Topic name cannot be empty.", exception.getMessage());
     }
 
     @Test
     public void testConstructorWithZeroNumPartitions() {
         Exception exception = assertThrows(IllegalArgumentException.class, () 
->
-            new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0, new 
HashMap<>()));
+            new TopicMetadata(Uuid.randomUuid(), "valid-topic", 0));
         assertEquals("Number of partitions must be positive.", 
exception.getMessage());
     }
 
     @Test
     public void testConstructorWithNegativeNumPartitions() {
         Exception exception = assertThrows(IllegalArgumentException.class, () 
->
-            new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1, new 
HashMap<>()));
+            new TopicMetadata(Uuid.randomUuid(), "valid-topic", -1));
         assertEquals("Number of partitions must be positive.", 
exception.getMessage());
     }
 
-    @Test
-    public void testConstructorWithNullPartitionRacks() {
-        assertThrows(NullPointerException.class, () ->
-            new TopicMetadata(Uuid.randomUuid(), "valid-topic", 3, null));
-    }
-
     @Test
     public void testFromRecord() {
         StreamsGroupPartitionMetadataValue.TopicMetadata record = new 
StreamsGroupPartitionMetadataValue.TopicMetadata()
             .setTopicId(Uuid.randomUuid())
             .setTopicName("test-topic")
-            .setNumPartitions(3)
-            .setPartitionMetadata(List.of(
-                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(0)
-                    .setRacks(List.of("rack1", "rack2")),
-                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(1)
-                    .setRacks(List.of("rack3")),
-                new StreamsGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(2)
-                    .setRacks(List.of("rack4", "rack5"))
-            ));
+            .setNumPartitions(3);
 
         TopicMetadata topicMetadata = TopicMetadata.fromRecord(record);
 
         assertEquals(record.topicId(), topicMetadata.id());
         assertEquals(record.topicName(), topicMetadata.name());
         assertEquals(record.numPartitions(), topicMetadata.numPartitions());
-
-        Map<Integer, Set<String>> expectedPartitionRacks = new HashMap<>();
-        expectedPartitionRacks.put(0, Set.of("rack1", "rack2"));
-        expectedPartitionRacks.put(1, Set.of("rack3"));
-        expectedPartitionRacks.put(2, Set.of("rack4", "rack5"));
-
-        assertEquals(expectedPartitionRacks, topicMetadata.partitionRacks());
     }
 }
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
index 4d7fdc08d4c..6ec600a0c38 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManagerTest.java
@@ -55,7 +55,7 @@ class InternalTopicManagerTest {
     @Test
     void 
testConfigureTopicsSetsConfigurationExceptionWhenSourceTopicIsMissing() {
         Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2, Collections.emptyMap()));
+        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2));
         // SOURCE_TOPIC_2 is missing from topicMetadata
         StreamsTopology topology = makeTestTopology();
 
@@ -70,10 +70,10 @@ class InternalTopicManagerTest {
     @Test
     void testConfigureTopics() {
         Map<String, TopicMetadata> topicMetadata = new HashMap<>();
-        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2, Collections.emptyMap()));
-        topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_2, 2, Collections.emptyMap()));
+        topicMetadata.put(SOURCE_TOPIC_1, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_1, 2));
+        topicMetadata.put(SOURCE_TOPIC_2, new TopicMetadata(Uuid.randomUuid(), 
SOURCE_TOPIC_2, 2));
         topicMetadata.put(STATE_CHANGELOG_TOPIC_2,
-            new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2, 
Collections.emptyMap()));
+            new TopicMetadata(Uuid.randomUuid(), STATE_CHANGELOG_TOPIC_2, 2));
         StreamsTopology topology = makeTestTopology();
 
         ConfiguredTopology configuredTopology = 
InternalTopicManager.configureTopics(new LogContext(), topology, topicMetadata);


Reply via email to