This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4cb5f1ca124 KAFKA-19661: Streams groups sometimes describe as 
NOT_READY when STABLE (#20600)
4cb5f1ca124 is described below

commit 4cb5f1ca124b810b32fcef1ca23f00faf70603b6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Oct 14 17:34:30 2025 +0200

    KAFKA-19661: Streams groups sometimes describe as NOT_READY when STABLE 
(#20600)
    
    Streams groups sometimes describe as NOT_READY when STABLE. That is, the
    group is configured and all topics exist, but when you use LIST_GROUP
    and STREAMS_GROUP_DESCRIBE, the group will show up as not ready.
    
    The root cause seems to be that
    https://github.com/apache/kafka/pull/19802 moved the creation of the
    soft state configured topology from the replay path to the heartbeat.
    This way, LIST_GROUP and STREAMS_GROUP_DESCRIBE, may not show the
    configured topology, because the configured topology that is created in
    the heartbeat is "thrown away", and the new group is recreated on the
    replay-path.
    
    To reflect a consistent view of the topology via LIST_GROUP and
    STREAMS_GROUP_DESCRIBE, we need to store additional information in the
    consumer offset topic. In particular, we need to store at least whether
    a topology was validated against the current topic metadata, as this
    defines whether a group is in STABLE and not in NOT_READY.
    
    This change adds a new field `validatedTopologyEpoch` to the metadata of
    the group, which stores precisely this information.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/api/IntegrationTestHarness.scala         |   9 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  77 ++++++--
 .../coordinator/group/GroupMetadataManager.java    |  16 +-
 .../streams/StreamsCoordinatorRecordHelpers.java   |   8 +-
 .../coordinator/group/streams/StreamsGroup.java    |  26 ++-
 .../common/message/StreamsGroupMetadataValue.json  |   4 +-
 .../group/GroupMetadataManagerTest.java            | 202 ++++++++++++++++-----
 .../StreamsCoordinatorRecordHelpersTest.java       |   9 +-
 .../group/streams/StreamsGroupBuilder.java         |   8 +-
 .../group/streams/StreamsGroupTest.java            |   4 +
 10 files changed, 280 insertions(+), 83 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 303e989e9b4..eb14858a944 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 
   def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
                                configsToRemove: List[String] = List(),
-                               inputTopic: String,
+                               inputTopics: Set[String],
+                               changelogTopics: Set[String] = Set(),
                                streamsGroupId: String): AsyncKafkaConsumer[K, 
V] = {
     val props = new Properties()
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
@@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       Optional.empty(),
       util.Map.of(
         "subtopology-0", new StreamsRebalanceData.Subtopology(
-          util.Set.of(inputTopic),
+          inputTopics.asJava,
           util.Set.of(),
           util.Map.of(),
-          util.Map.of(inputTopic + "-store-changelog", new 
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), 
util.Map.of())),
+          changelogTopics.map(c => (c, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), 
util.Map.of()))).toMap.asJava,
           util.Set.of()
         )),
       Map.empty[String, String].asJava
@@ -270,7 +271,7 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       configOverrides = props,
       streamsRebalanceData = streamsRebalanceData
     )
-    consumer.subscribe(util.Set.of(inputTopic),
+    consumer.subscribe(inputTopics.asJava,
       new StreamsRebalanceListener {
         override def onTasksRevoked(tasks: 
util.Set[StreamsRebalanceData.TaskId]): Unit = ()
         override def onTasksAssigned(assignment: 
StreamsRebalanceData.Assignment): Unit = ()
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c8e26445922..d48ddaf58ad 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2318,7 +2318,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-
   /**
    * Test the consumer group APIs for member removal.
    */
@@ -2587,7 +2586,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
 
     val streamsGroup = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId
     )
 
@@ -4412,7 +4412,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     prepareRecords(testTopicName)
 
     val streams = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId
     )
     streams.poll(JDuration.ofMillis(500L))
@@ -4422,7 +4423,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         val firstGroup = client.listGroups().all().get().stream()
           .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
         firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
-      }, "Stream group not stable yet")
+      }, "Streams group did not transition to STABLE before timeout")
 
       // Verify the describe call works correctly
       val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@@ -4458,7 +4459,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client = Admin.create(config)
 
     val streams = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId
     )
     streams.poll(JDuration.ofMillis(500L))
@@ -4468,7 +4470,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         val firstGroup = client.listGroups().all().get().stream()
           .filter(g => g.groupId() == streamsGroupId).findFirst().orElse(null)
         firstGroup.groupState().orElse(null) == GroupState.NOT_READY && 
firstGroup.groupId() == streamsGroupId
-      }, "Stream group not NOT_READY yet")
+      }, "Streams group did not transition to NOT_READY before timeout")
 
       // Verify the describe call works correctly
       val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
@@ -4490,6 +4492,55 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @Test
+  def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streams = createStreamsGroup(
+      inputTopics = Set(testTopicName),
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Streams group did not transition to STABLE before timeout")
+
+      // Verify the describe call works correctly
+      val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+      val group = describedGroups.get(streamsGroupId)
+      assertNotNull(group)
+      assertEquals(streamsGroupId, group.groupId())
+      assertFalse(group.members().isEmpty)
+      assertNotNull(group.subtopologies())
+      assertFalse(group.subtopologies().isEmpty)
+
+      // Verify the topology contains the expected source and sink topics
+      val subtopologies = group.subtopologies().asScala
+      assertTrue(subtopologies.exists(subtopology =>
+        subtopology.sourceTopics().contains(testTopicName)))
+
+      // Test describing a non-existing group
+      val nonExistingGroup = "non_existing_stream_group"
+      val describedNonExistingGroupResponse = 
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+      assertFutureThrows(classOf[GroupIdNotFoundException], 
describedNonExistingGroupResponse.all())
+
+    } finally {
+      Utils.closeQuietly(streams, "streams")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+  
   @Test
   def testDeleteStreamsGroups(): Unit = {
     val testTopicName = "test_topic"
@@ -4512,7 +4563,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         val streamsGroupId = s"stream_group_id_$i"
 
         val streams = createStreamsGroup(
-          inputTopic = testTopicName,
+          inputTopics = Set(testTopicName),
+          changelogTopics = Set(testTopicName + "-changelog"),
           streamsGroupId = streamsGroupId,
         )
         streams.poll(JDuration.ofMillis(500L))
@@ -4595,7 +4647,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     val streams = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
@@ -4611,7 +4664,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       TestUtils.waitUntilTrue(() => {
         val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
         firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
-      }, "Stream group not stable yet")
+      }, "Streams group did not transition to STABLE before timeout")
 
       val allTopicPartitions = client.listStreamsGroupOffsets(
         util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
@@ -4655,7 +4708,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     val streams = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
@@ -4732,7 +4786,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     val streams = createStreamsGroup(
-      inputTopic = testTopicName,
+      inputTopics = Set(testTopicName),
+      changelogTopics = Set(testTopicName + "-changelog"),
       streamsGroupId = streamsGroupId,
     )
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index be9c4f9abc3..95fc9bbb466 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -250,9 +250,9 @@ import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMe
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
-import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroupMember.hasAssignedTasksChanged;
@@ -1960,19 +1960,26 @@ public class GroupMetadataManager {
             updatedConfiguredTopology = group.configuredTopology().get();
         }
 
+        // 3b. If the topology is validated, persist the fact that it is 
validated.
+        int validatedTopologyEpoch = -1;
         if (updatedConfiguredTopology.isReady()) {
+            validatedTopologyEpoch = updatedTopology.topologyEpoch();
             SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = 
updatedConfiguredTopology.subtopologies().get();
             throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedActiveTasks);
             throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedStandbyTasks);
             throwIfRequestContainsInvalidTasks(subtopologySortedMap, 
ownedWarmupTasks);
         }
+        // We validated a topology that was not validated before, so bump the 
group epoch as we may have to reassign tasks.
+        if (validatedTopologyEpoch != group.validatedTopologyEpoch()) {
+            bumpGroupEpoch = true;
+        }
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
             groupEpoch += 1;
-            records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 
metadataHash));
-            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {}.", groupId, memberId, groupEpoch, metadataHash);
+            records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch));
+            log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
@@ -4291,7 +4298,7 @@ public class GroupMetadataManager {
 
         // We bump the group epoch.
         int groupEpoch = group.groupEpoch() + 1;
-        records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 
0));
+        records.add(newStreamsGroupMetadataRecord(group.groupId(), groupEpoch, 
group.metadataHash(), group.validatedTopologyEpoch()));
 
         cancelTimers(group.groupId(), member.memberId());
 
@@ -5411,6 +5418,7 @@ public class GroupMetadataManager {
             StreamsGroup streamsGroup = 
getOrMaybeCreatePersistedStreamsGroup(groupId, true);
             streamsGroup.setGroupEpoch(value.epoch());
             streamsGroup.setMetadataHash(value.metadataHash());
+            
streamsGroup.setValidatedTopologyEpoch(value.validatedTopologyEpoch());
         } else {
             StreamsGroup streamsGroup;
             try {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
index d54f7273eb0..4302d65b6c7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java
@@ -96,10 +96,11 @@ public class StreamsCoordinatorRecordHelpers {
         );
     }
 
-    public static CoordinatorRecord newStreamsGroupEpochRecord(
+    public static CoordinatorRecord newStreamsGroupMetadataRecord(
         String groupId,
         int newGroupEpoch,
-        long metadataHash
+        long metadataHash,
+        int validatedTopologyEpoch
     ) {
         Objects.requireNonNull(groupId, "groupId should not be null here");
 
@@ -109,7 +110,8 @@ public class StreamsCoordinatorRecordHelpers {
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataValue()
                     .setEpoch(newGroupEpoch)
-                    .setMetadataHash(metadataHash),
+                    .setMetadataHash(metadataHash)
+                    .setValidatedTopologyEpoch(validatedTopologyEpoch),
                 (short) 0
             )
         );
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 7ec3596628e..c0a4fb1f13b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -147,9 +147,9 @@ public class StreamsGroup implements Group {
     private final TimelineHashMap<String, String> staticMembers;
 
     /**
-     * The metadata associated with each subscribed topic name.
+     * The topology epoch for which the subscribed topics identified by 
metadataHash are validated.
      */
-    private final TimelineHashMap<String, TopicMetadata> partitionMetadata;
+    private final TimelineInteger validatedTopologyEpoch;
 
     /**
      * The metadata hash which is computed based on the all subscribed topics.
@@ -222,7 +222,7 @@ public class StreamsGroup implements Group {
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.partitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
         this.metadataHash = new TimelineLong(snapshotRegistry);
         this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -282,7 +282,6 @@ public class StreamsGroup implements Group {
 
     public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
         this.configuredTopology.set(Optional.ofNullable(configuredTopology));
-        maybeUpdateGroupState();
     }
 
     /**
@@ -598,6 +597,23 @@ public class StreamsGroup implements Group {
         this.metadataHash.set(metadataHash);
     }
 
+    /**
+     * @return The validated topology epoch.
+     */
+    public int validatedTopologyEpoch() {
+        return validatedTopologyEpoch.get();
+    }
+
+    /**
+     * Updates the validated topology epoch.
+     *
+     * @param validatedTopologyEpoch The validated topology epoch
+     */
+    public void setValidatedTopologyEpoch(int validatedTopologyEpoch) {
+        this.validatedTopologyEpoch.set(validatedTopologyEpoch);
+        maybeUpdateGroupState();
+    }
+
     /**
      * Computes the metadata hash based on the current topology and the 
current metadata image.
      *
@@ -835,7 +851,7 @@ public class StreamsGroup implements Group {
         if (members.isEmpty()) {
             newState = EMPTY;
             clearShutdownRequestMemberId();
-        } else if (topology().isEmpty() || configuredTopology().isEmpty() || 
!configuredTopology().get().isReady()) {
+        } else if (topology().filter(t -> t.topologyEpoch() == 
validatedTopologyEpoch.get()).isEmpty()) {
             newState = NOT_READY;
         } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
             newState = ASSIGNING;
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
index 0d06b0c7f49..b66f8181af9 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json
@@ -24,6 +24,8 @@
     { "name": "Epoch", "versions": "0+", "type": "int32",
       "about": "The group epoch." },
     { "name": "MetadataHash", "versions": "0+", "type": "int64",
-      "about": "The hash of all topics in the group." }
+      "about": "The hash of all topics in the group." },
+    { "name": "ValidatedTopologyEpoch", "versions": "0+", "taggedVersions": 
"0+", "tag": 0, "default": -1, "type": "int32",
+      "about": "The topology epoch whose topics where validated to be present 
in a valid configuration in the metadata." }
   ]
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6521c48532c..65e7e87e4fd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -139,7 +139,6 @@ import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRol
 import org.apache.kafka.coordinator.group.streams.TasksTuple;
 import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
-import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
@@ -4434,23 +4433,27 @@ public class GroupMetadataManagerTest {
             .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(1), 10) // 
Stable group
                 .withTargetAssignmentEpoch(10)
                 .withTopology(new StreamsTopology(1, Map.of()))
+                .withValidatedTopologyEpoch(1)
                 
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(0))
                     .setMemberEpoch(10)
                     .build()))
             .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(2), 10) // 
Assigning group
                 .withTargetAssignmentEpoch(9)
                 .withTopology(new StreamsTopology(1, Map.of()))
+                .withValidatedTopologyEpoch(1)
                 
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(1))
                     .setMemberEpoch(9)
                     .build()))
             .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(3), 10) // 
Reconciling group
                 .withTargetAssignmentEpoch(10)
                 .withTopology(new StreamsTopology(1, Map.of()))
+                .withValidatedTopologyEpoch(1)
                 
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(2))
                     .setMemberEpoch(9)
                     .build()))
             .withStreamsGroup(new StreamsGroupBuilder(groupIds.get(4), 10) // 
NotReady group
                 .withTargetAssignmentEpoch(10)
+                .withTopology(new StreamsTopology(1, Map.of()))
                 
.withMember(streamsGroupMemberBuilderWithDefaults(streamsMemberIds.get(3))
                     .build()))
             .build();
@@ -9736,7 +9739,7 @@ public class GroupMetadataManagerTest {
         StreamsGroupMember.Builder memberBuilder1 = 
streamsGroupMemberBuilderWithDefaults(memberId1);
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder1.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder1.build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
 epoch + 1, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 1, 0, -1));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(streamsGroupId,
 topology));
 
         TasksTuple assignment = new TasksTuple(
@@ -9749,7 +9752,7 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder2.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
 memberId2, assignment));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(streamsGroupId,
 epoch + 2, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 2, 0, 0));
 
         List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(streamsGroupId), 
context.lastCommittedOffset);
         StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
@@ -9780,7 +9783,7 @@ public class GroupMetadataManagerTest {
                         )
                     )
             )
-            .setGroupState(StreamsGroup.StreamsGroupState.NOT_READY.toString())
+            .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
             .setGroupEpoch(epoch + 2);
         assertEquals(1, actual.size());
         assertEquals(describedGroup, actual.get(0));
@@ -16035,7 +16038,7 @@ public class GroupMetadataManagerTest {
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 member));
 
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 100, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 100, 0, 0));
 
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
 
@@ -16283,7 +16286,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
groupMetadataHash),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
groupMetadataHash, 0),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -16296,6 +16299,98 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset, so that the latest state will be described below
+        context.commit();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> 
actualDescribedGroups = 
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), 
context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup 
= new StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setAssignmentEpoch(1)
+            .setTopology(
+                new StreamsGroupDescribeResponseData.Topology()
+                    .setEpoch(0)
+                    .setSubtopologies(List.of(
+                        new StreamsGroupDescribeResponseData.Subtopology()
+                            .setSubtopologyId(subtopology1)
+                            .setSourceTopics(List.of(fooTopicName))
+                    ))
+            )
+            .setMembers(Collections.singletonList(
+                
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+            ))
+            .setGroupState(StreamsGroupState.STABLE.toString())
+            .setGroupEpoch(1);
+        assertEquals(1, actualDescribedGroups.size());
+        assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0));
+    }
+
     @Test
     public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
         String groupId = "fooup";
@@ -16364,9 +16459,9 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            ))),
+            )), -1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16450,14 +16545,15 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            ))),
+            )), -1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
+        assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
         assertRecordsEquals(expectedRecords, result.records());
     }
 
@@ -16532,15 +16628,16 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage),
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
-            ))),
+            )), -1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
+        assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
         assertRecordsEquals(expectedRecords, result.records());
     }
 
@@ -16577,6 +16674,7 @@ public class GroupMetadataManagerTest {
             .withStreamsGroup(
                 new StreamsGroupBuilder(groupId, 10)
                     
.withTopology(StreamsTopology.fromHeartbeatRequest(topology1))
+                    .withValidatedTopologyEpoch(1)
             )
             .build();
 
@@ -16627,10 +16725,10 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage),
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
-            ))),
+            )), 1),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -16682,6 +16780,7 @@ public class GroupMetadataManagerTest {
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
                 .withMetadataHash(groupMetadataHash)
+                .withValidatedTopologyEpoch(0)
             )
             .build();
 
@@ -16751,6 +16850,8 @@ public class GroupMetadataManagerTest {
             .buildCoordinatorMetadataImage();
 
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        long metadataHash = computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage)));
+
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
             .withMetadataImage(metadataImage)
@@ -16767,7 +16868,8 @@ public class GroupMetadataManagerTest {
                     .build())
                 .withTargetAssignmentEpoch(10)
                 .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
-                .withMetadataHash(computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))))
+                .withValidatedTopologyEpoch(0)
+                .withMetadataHash(metadataHash)
             )
             .build();
 
@@ -16792,17 +16894,12 @@ public class GroupMetadataManagerTest {
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId1),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
metadataHash, 0)
             ),
             result1.records()
         );
 
-        for (CoordinatorRecord record : result1.records()) {
-            context.replay(record);
-        }
-        assignor.prepareGroupAssignment(
-            Map.of(memberId1, TasksTuple.EMPTY)
-        );
+        assignor.prepareGroupAssignment(Map.of());
 
         CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result2 = context.streamsGroupHeartbeat(
             new StreamsGroupHeartbeatRequestData()
@@ -16814,7 +16911,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
-                .setMemberEpoch(12)
+                .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
                 .setStatus(List.of(
                     new StreamsGroupHeartbeatResponseData.Status()
@@ -16916,7 +17013,7 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
groupMetadataHash),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
groupMetadataHash, 0),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -17022,10 +17119,10 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
                 barTopicName, computeTopicHash(barTopicName, newMetadataImage)
-            ))),
+            )), 0),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -17206,7 +17303,7 @@ public class GroupMetadataManagerTest {
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId2),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 0)
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 0, 
-1)
         );
 
         assertRecordsEquals(expectedRecords, result.records());
@@ -17334,6 +17431,7 @@ public class GroupMetadataManagerTest {
                     TaskAssignmentTestUtil.mkTasks(subtopology2, 2)))
                 .withTargetAssignmentEpoch(10)
                 .withMetadataHash(groupMetadataHash)
+                .withValidatedTopologyEpoch(0)
             )
             .build();
 
@@ -17776,16 +17874,12 @@ public class GroupMetadataManagerTest {
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
 topology));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
 streamsGroupMemberBuilderWithDefaults(memberId1)
             .build()));
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId,
 11, groupMetadataHash));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
 11, groupMetadataHash, -1));
 
         assertEquals(StreamsGroupState.NOT_READY, 
context.streamsGroupState(groupId));
 
         context.groupMetadataManager.getStreamsGroupOrThrow(groupId)
-            .setConfiguredTopology(InternalTopicManager.configureTopics(
-                new LogContext(),
-                groupMetadataHash,
-                
StreamsTopology.fromRecord(StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology)),
-                metadataImage));
+            .setValidatedTopologyEpoch(0);
 
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
context.streamsGroupState(groupId));
 
@@ -17942,9 +18036,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            ))),
+            )), 0),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -18064,9 +18158,9 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            ))),
+            )), 0),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -18169,13 +18263,17 @@ public class GroupMetadataManagerTest {
         Topology topology = new Topology().setSubtopologies(List.of(
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+        ));
 
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 6)
-                .buildCoordinatorMetadataImage())
+            .withMetadataImage(metadataImage)
             .build();
 
         assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
@@ -18211,7 +18309,7 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 2, 0)
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
groupMetadataHash, 0)
                     )
                 )
             )),
@@ -18393,13 +18491,17 @@ public class GroupMetadataManagerTest {
         Topology topology = new Topology().setSubtopologies(List.of(
             new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
         ));
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        long groupMetadataHash = computeGroupHash(Map.of(
+            fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+        ));
 
         MockTaskAssignor assignor = new MockTaskAssignor("sticky");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withStreamsGroupTaskAssignors(List.of(assignor))
-            .withMetadataImage(new MetadataImageBuilder()
-                .addTopic(fooTopicId, fooTopicName, 3)
-                .buildCoordinatorMetadataImage())
+            .withMetadataImage(metadataImage)
             .build();
 
         assignor.prepareGroupAssignment(
@@ -18511,7 +18613,7 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId1),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 3, 0)
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, 
groupMetadataHash, 0)
                     )
                 )
             )),
@@ -18799,7 +18901,7 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, 
expectedMember),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, 
topology),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 
0),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 
1, 0, -1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
 memberId, TasksTuple.EMPTY),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
 1),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
@@ -19383,7 +19485,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists but the member is already gone. Replaying the
         // StreamsGroupMemberMetadata tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord("foo",
 "m1"));
         assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
 
@@ -19439,7 +19541,7 @@ public class GroupMetadataManagerTest {
             .build();
 
         // The group is created if it does not exist.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
         assertEquals(10, 
context.groupMetadataManager.streamsGroup("foo").groupEpoch());
     }
 
@@ -19631,7 +19733,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists, but the member is already gone. Replaying 
the
         // StreamsGroupCurrentMemberAssignment tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord("foo",
 "m1"));
         assertThrows(UnknownMemberIdException.class, () -> 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("m1"));
 
@@ -19707,7 +19809,7 @@ public class GroupMetadataManagerTest {
 
         // The group still exists, but the member is already gone. Replaying 
the
         // StreamsGroupTopology tombstone should be a no-op.
-        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord("foo",
 10, 0));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord("foo",
 10, 0, 0));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone("foo"));
         
assertTrue(context.groupMetadataManager.streamsGroup("foo").topology().isEmpty());
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
index 2485cb65e6f..457bd55c602 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpersTest.java
@@ -250,19 +250,20 @@ class StreamsCoordinatorRecordHelpersTest {
     }
 
     @Test
-    public void testNewStreamsGroupEpochRecord() {
+    public void testNewStreamsGroupMetadataRecord() {
         CoordinatorRecord expectedRecord = CoordinatorRecord.record(
             new StreamsGroupMetadataKey()
                 .setGroupId(GROUP_ID),
             new ApiMessageAndVersion(
                 new StreamsGroupMetadataValue()
                     .setEpoch(42)
-                    .setMetadataHash(42),
+                    .setMetadataHash(43)
+                    .setValidatedTopologyEpoch(44),
                 (short) 0
             )
         );
 
-        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(GROUP_ID, 42, 42));
+        assertEquals(expectedRecord, 
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(GROUP_ID, 42, 43, 
44));
     }
 
     @Test
@@ -676,7 +677,7 @@ class StreamsCoordinatorRecordHelpersTest {
     @Test
     public void testNewStreamsGroupEpochRecordNullGroupId() {
         NullPointerException exception = 
assertThrows(NullPointerException.class, () ->
-            StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(null, 
1, 1));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(null, 1, 1, 1));
         assertEquals("groupId should not be null here", 
exception.getMessage());
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index 5d291d9884d..b7ffdd82def 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -35,6 +35,7 @@ public class StreamsGroupBuilder {
     private final Map<String, StreamsGroupMember> members = new HashMap<>();
     private final Map<String, TasksTuple> targetAssignments = new HashMap<>();
     private long metadataHash = 0L;
+    private int validatedTopologyEpoch = -1;
 
     public StreamsGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
@@ -53,6 +54,11 @@ public class StreamsGroupBuilder {
         return this;
     }
 
+    public StreamsGroupBuilder withValidatedTopologyEpoch(int 
validatedTopologyEpoch) {
+        this.validatedTopologyEpoch = validatedTopologyEpoch;
+        return this;
+    }
+
     public StreamsGroupBuilder withTopology(StreamsTopology streamsTopology) {
         this.topology = streamsTopology;
         return this;
@@ -79,7 +85,7 @@ public class StreamsGroupBuilder {
 
         // Add group epoch record.
         records.add(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, groupEpoch, 
metadataHash));
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 
groupEpoch, metadataHash, validatedTopologyEpoch));
 
         // Add target assignment records.
         targetAssignments.forEach((memberId, assignment) ->
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index ba24abd2b80..94f78006426 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -498,6 +498,7 @@ public class StreamsGroupTest {
 
         streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
         streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+        streamsGroup.setValidatedTopologyEpoch(1);
 
         assertEquals(MemberState.STABLE, member1.state());
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
@@ -694,6 +695,7 @@ public class StreamsGroupTest {
         );
         group.setGroupEpoch(1);
         group.setTopology(new StreamsTopology(1, Map.of()));
+        group.setValidatedTopologyEpoch(1);
         group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
         group.setTargetAssignmentEpoch(1);
         group.updateMember(new StreamsGroupMember.Builder("member1")
@@ -760,6 +762,7 @@ public class StreamsGroupTest {
 
         streamsGroup.setTopology(new StreamsTopology(1, Map.of()));
         streamsGroup.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+        streamsGroup.setValidatedTopologyEpoch(1);
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
@@ -805,6 +808,7 @@ public class StreamsGroupTest {
         group.setGroupEpoch(1);
         group.setTopology(new StreamsTopology(1, Map.of()));
         group.setConfiguredTopology(new ConfiguredTopology(1, 0, 
Optional.of(new TreeMap<>()), Map.of(), Optional.empty()));
+        group.setValidatedTopologyEpoch(1);
         group.setTargetAssignmentEpoch(1);
         group.updateMember(new StreamsGroupMember.Builder("member1")
             .setMemberEpoch(1)

Reply via email to