This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c8e3c58da6 KAFKA-20268: Initialize group epochs to 1 for assignment 
offload (#21700)
5c8e3c58da6 is described below

commit 5c8e3c58da66ae95036781c7d992f51c3f6152c2
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 11 08:42:52 2026 +0000

    KAFKA-20268: Initialize group epochs to 1 for assignment offload (#21700)
    
    Once assignment offload is implemented, newly created groups may not
    have their first assignment at epoch 1 available yet. We cannot give the
    member an epoch of 0, since it is reserved for new members joining a
    group. The new member's epoch must be greater than 0 and less than the
    group epoch.
    
    To resolve this, we initialize groups at epoch 1 with target assignment
    epoch 1. Epoch 1 contains an empty assignment and the first computed
    assignment will have epoch 2. Members joining a new group will reconcile
    towards epoch 1 while waiting for epoch 2's assignment.
    
    Streams groups already start group epochs at 2. We update streams groups
    to use the same approach as consumer and share groups.
    
    Reviewers: Lucas Brutschy <[email protected]>, David Jacot
     <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   4 +-
 .../server/ConsumerGroupDescribeRequestTest.scala  |   4 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala |  30 ++---
 .../server/ConsumerProtocolMigrationTest.scala     |   6 +-
 .../server/ShareGroupDescribeRequestTest.scala     |   4 +-
 .../server/ShareGroupHeartbeatRequestTest.scala    |  60 ++++-----
 .../kafka/server/TxnOffsetCommitRequestTest.scala  |   2 +-
 .../kafka/server/WriteTxnMarkersRequestTest.scala  |   2 +-
 .../coordinator/group/GroupMetadataManager.java    |   9 +-
 .../group/TargetAssignmentMetadata.java            |   2 +-
 .../coordinator/group/modern/ModernGroup.java      |   3 +-
 .../coordinator/group/streams/StreamsGroup.java    |   3 +-
 .../group/GroupMetadataManagerTest.java            | 142 +++++++++++----------
 .../modern/consumer/ConsumerGroupBuilder.java      |   5 +-
 .../group/modern/consumer/ConsumerGroupTest.java   |  45 ++++---
 .../group/modern/share/ShareGroupBuilder.java      |   5 +-
 .../group/modern/share/ShareGroupTest.java         |   8 +-
 .../group/streams/StreamsGroupBuilder.java         |   5 +-
 .../group/streams/StreamsGroupTest.java            |  32 ++---
 19 files changed, 193 insertions(+), 178 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 327b5ad2b36..a9c23a0bc08 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2069,8 +2069,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           assertTrue(testGroupDescription.groupEpoch.isEmpty)
           assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
         } else {
-          assertEquals(Optional.of(3), testGroupDescription.groupEpoch)
-          assertEquals(Optional.of(3), 
testGroupDescription.targetAssignmentEpoch)
+          assertEquals(Optional.of(4), testGroupDescription.groupEpoch)
+          assertEquals(Optional.of(4), 
testGroupDescription.targetAssignmentEpoch)
         }
 
         assertEquals(testGroupId, testGroupDescription.groupId())
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 17057345c5b..156969f4ece 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -134,8 +134,8 @@ class ConsumerGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCo
         new DescribedGroup()
           .setGroupId("grp-1")
           .setGroupState(ConsumerGroupState.STABLE.toString)
-          .setGroupEpoch(1)
-          .setAssignmentEpoch(1)
+          .setGroupEpoch(2)
+          .setAssignmentEpoch(2)
           .setAssignorName("uniform")
           .setAuthorizedOperations(authorizedOperationsInt)
           .setMembers(List(
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 6707ee94b7f..a5e4eb42dca 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -100,7 +100,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the response.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
     // Create the topic.
@@ -132,7 +132,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
     // Verify the response.
-    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
     // Leave the group.
@@ -382,7 +382,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the response.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
     // Create the topic.
@@ -416,7 +416,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the response.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
     val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
@@ -451,7 +451,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the response.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
     // The 2 member IDs should be different
     assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
@@ -493,7 +493,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the response.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
 
     // Create the topic.
@@ -526,7 +526,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
     // Verify the response.
-    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
     // A new static member tries to join the group with an inuse instanceid.
@@ -554,8 +554,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
         consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
     }, msg = s"Could not re-join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
 
-    // Verify the response. The group epoch bumps upto 4 which eventually 
reflects in the new member epoch.
-    assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
+    // Verify the response. The group epoch bumps upto 5 which eventually 
reflects in the new member epoch.
+    assertEquals(5, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
   }
 
@@ -598,7 +598,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
       // Verify the response.
       assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(5000, 
consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)
 
       // Alter consumer heartbeat interval config
@@ -706,7 +706,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify initial join success.
     assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
-    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
 
     // Create the topic to trigger partition assignment.
     val topicId = createTopic(
@@ -735,8 +735,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
         consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
     }, msg = s"Could not get partitions assigned. Last response 
$consumerGroupHeartbeatResponse.")
 
-    // Verify member has epoch > 0 (should be 2).
-    assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+    // Verify member has epoch > 0 (should be 3).
+    assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
     assertEquals(expectedAssignment, 
consumerGroupHeartbeatResponse.data.assignment)
 
     // Simulate a fenced member attempting to rejoin with epoch=0.
@@ -754,11 +754,11 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     // Verify the full response.
     // Since the subscription/metadata hasn't changed, the member should get
-    // their current state back with the same epoch (2) and assignment.
+    // their current state back with the same epoch (3) and assignment.
     val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData()
       .setErrorCode(Errors.NONE.code)
       .setMemberId(memberId)
-      .setMemberEpoch(2)
+      .setMemberEpoch(3)
       .setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
       .setAssignment(expectedAssignment)
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 1b1dec69eaf..3e86e605d2d 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -299,14 +299,14 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       syncGroupWithOldProtocol(
         groupId = groupId,
         memberId = memberId2,
-        generationId = 2
+        generationId = 3
       )
     )
 
     // Member 2 heartbeats.
     heartbeat(
       groupId = groupId,
-      generationId = 2,
+      generationId = 3,
       memberId = memberId2
     )
 
@@ -323,7 +323,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     // Member 2 heartbeats and gets REBALANCE_IN_PROGRESS.
     heartbeat(
       groupId = groupId,
-      generationId = 2,
+      generationId = 3,
       memberId = memberId2,
       expectedError = Errors.REBALANCE_IN_PROGRESS
     )
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
index 408f31db8d1..16278cc627e 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
@@ -108,8 +108,8 @@ class ShareGroupDescribeRequestTest(cluster: 
ClusterInstance) extends GroupCoord
           new DescribedGroup()
             .setGroupId("grp-1")
             .setGroupState(GroupState.STABLE.toString)
-            .setGroupEpoch(1)
-            .setAssignmentEpoch(1)
+            .setGroupEpoch(2)
+            .setAssignmentEpoch(2)
             .setAssignorName("simple")
             .setAuthorizedOperations(authorizedOperationsInt)
             .setMembers(List(
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index f2993ecac99..11bf42b3f6c 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -91,7 +91,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       // Verify the response.
       assertNotNull(shareGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
 
       // Create the topic.
@@ -124,7 +124,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
 
       // Verify the response.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(expectedAssignment, 
shareGroupHeartbeatResponse.data.assignment)
 
       // Leave the group.
@@ -182,7 +182,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response for member 1.
       val memberId1 = shareGroupHeartbeatResponse.data.memberId
       assertNotNull(memberId1)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
 
       // The second member request to join the group.
@@ -203,7 +203,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response for member 2.
       val memberId2 = shareGroupHeartbeatResponse.data.memberId
       assertNotNull(memberId2)
-      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
       // Verify the member id is different.
       assertNotEquals(memberId1, memberId2)
@@ -220,7 +220,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId1)
-          .setMemberEpoch(1)
+          .setMemberEpoch(2)
       ).build()
 
       // Heartbeats until the partitions are assigned for member 1.
@@ -243,14 +243,14 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       val topicPartitionsAssignedToMember1 = 
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
       // Verify the response.
-      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
 
       // Prepare the next heartbeat for member 2.
       shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId2)
-          .setMemberEpoch(2)
+          .setMemberEpoch(3)
       ).build()
 
       // Heartbeats until the partitions are assigned for member 2.
@@ -262,7 +262,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       val topicPartitionsAssignedToMember2 = 
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
       // Verify the response.
-      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
 
       val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
       topicPartitionsAssignedToMember1.forEach(topicPartition => {
@@ -280,7 +280,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId1)
-          .setMemberEpoch(3)
+          .setMemberEpoch(4)
       ).build()
 
       // Heartbeats until the response for no change of assignment occurs for 
member 1 with same epoch.
@@ -292,7 +292,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
 
       // Verify the response.
-      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
     } finally {
       admin.close()
     }
@@ -336,7 +336,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response for member.
       val memberId = shareGroupHeartbeatResponse.data.memberId
       assertNotNull(memberId)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
 
       // Create the topic.
@@ -357,7 +357,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(1)
+          .setMemberEpoch(2)
       ).build()
 
       TestUtils.waitUntilTrue(() => {
@@ -367,7 +367,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
 
       // Verify the response.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
 
       // Member leaves the group.
       shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@@ -398,7 +398,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       shareGroupHeartbeatResponse = 
connectAndReceive(shareGroupHeartbeatRequest)
 
       // Verify the response for member 1.
-      assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
       // Partition assignment remains intact on rejoining.
       assertEquals(expectedAssignment, 
shareGroupHeartbeatResponse.data.assignment)
@@ -441,7 +441,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response for member.
       val memberId = shareGroupHeartbeatResponse.data.memberId
       assertNotNull(memberId)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
       // Create the topic foo.
       val fooTopicId = TestUtils.createTopicWithAdminRaw(
@@ -469,7 +469,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(1)
+          .setMemberEpoch(2)
       ).build()
 
       cluster.waitTopicCreation("foo", 2)
@@ -483,7 +483,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
           
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
       }, msg = s"Could not get partitions for topic foo and bar assigned. Last 
response $shareGroupHeartbeatResponse.")
       // Verify the response.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
       // Create the topic baz.
       val bazTopicId = TestUtils.createTopicWithAdminRaw(
         admin = admin,
@@ -507,7 +507,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(3)
+          .setMemberEpoch(4)
       ).build()
 
       TestUtils.waitUntilTrue(() => {
@@ -518,7 +518,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
           
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
       }, msg = s"Could not get partitions for topic baz assigned. Last 
response $shareGroupHeartbeatResponse.")
       // Verify the response.
-      assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
       // Increasing the partitions of topic bar which is already being 
consumed in the share group.
       increasePartitions(admin, "bar", 6)
 
@@ -538,7 +538,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(5)
+          .setMemberEpoch(6)
       ).build()
 
       TestUtils.waitUntilTrue(() => {
@@ -549,7 +549,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
           
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
       }, msg = s"Could not update partitions assignment for topic bar. Last 
response $shareGroupHeartbeatResponse.")
       // Verify the response.
-      assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
       // Delete the topic foo.
       TestUtils.deleteTopicWithAdmin(
         admin = admin,
@@ -571,7 +571,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(7)
+          .setMemberEpoch(8)
       ).build()
 
       TestUtils.waitUntilTrue(() => {
@@ -582,7 +582,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
           
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
       }, msg = s"Could not update partitions assignment for topic foo. Last 
response $shareGroupHeartbeatResponse.")
       // Verify the response.
-      assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(9, shareGroupHeartbeatResponse.data.memberEpoch)
     } finally {
       admin.close()
     }
@@ -705,7 +705,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       val memberId = shareGroupHeartbeatResponse.data.memberId
       var memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
       assertNotNull(memberId)
-      assertEquals(1, memberEpoch)
+      assertEquals(2, memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
 
       // Create the topic.
@@ -875,7 +875,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       // Verify the response for member.
       val memberId = shareGroupHeartbeatResponse.data.memberId
       assertNotNull(memberId)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
       assertEquals(new ShareGroupHeartbeatResponseData.Assignment(), 
shareGroupHeartbeatResponse.data.assignment)
       // Create the topic.
       val fooId = TestUtils.createTopicWithAdminRaw(
@@ -893,7 +893,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(1)
+          .setMemberEpoch(2)
       ).build()
 
       TestUtils.waitUntilTrue(() => {
@@ -902,7 +902,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
           shareGroupHeartbeatResponse.data.assignment == expectedAssignment
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
       // Verify the response.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
 
       // Restart the only running broker.
       val broker = cluster.brokers().values().iterator().next()
@@ -914,7 +914,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         new ShareGroupHeartbeatRequestData()
           .setGroupId("grp")
           .setMemberId(memberId)
-          .setMemberEpoch(2)
+          .setMemberEpoch(3)
       ).build()
 
       // Should receive no error and no assignment changes.
@@ -925,7 +925,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       // Verify the response. Epoch should not have changed and null 
assignments determines that no
       // change in old assignment.
-      assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
       assertNull(shareGroupHeartbeatResponse.data.assignment)
     } finally {
       admin.close()
@@ -968,7 +968,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
 
       // Verify initial join success.
       assertNotNull(shareGroupHeartbeatResponse.data.memberId)
-      assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+      assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
 
       // Create the topic to trigger partition assignment.
       val topicId = TestUtils.createTopicWithAdminRaw(
diff --git 
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index aef40390d85..cc6cd2c957e 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -295,7 +295,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
       commitTxnOffset(
         groupId = groupId,
         memberId = if (version >= 3) memberId else 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
-        generationId = if (version >= 3) 1 else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+        generationId = if (version >= 3) memberEpoch else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
         producerId = producerIdAndEpoch.producerId,
         producerEpoch = producerIdAndEpoch.epoch,
         transactionalId = transactionalId,
diff --git 
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index a68de4dacc0..a476271c04c 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -119,7 +119,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance) 
extends GroupCoordinat
       commitTxnOffset(
         groupId = groupId,
         memberId = if (version >= 3) memberId else 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
-        generationId = if (version >= 3) 1 else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+        generationId = if (version >= 3) memberEpoch else 
JoinGroupRequest.UNKNOWN_GENERATION_ID,
         producerId = producerIdAndEpoch.producerId,
         producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort 
else producerIdAndEpoch.epoch,
         transactionalId = transactionalId,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 694097f3ffa..3d7ecdbfe2c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2062,11 +2062,7 @@ public class GroupMetadataManager {
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
         if (bumpGroupEpoch) {
-            if (groupEpoch == 0) {
-                groupEpoch = 2;
-            } else {
-                groupEpoch += 1;
-            }
+            groupEpoch += 1;
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
@@ -2390,9 +2386,6 @@ public class GroupMetadataManager {
         SubscriptionType subscriptionType = group.subscriptionType();
 
         boolean bumpGroupEpoch =
-            // If the group is newly created, we must ensure that it moves 
away from
-            // epoch 0 and that it is fully initialized.
-            groupEpoch == 0 ||
             // Bumping the group epoch signals that the target assignment 
should be updated. We bump
             // the group epoch when the member has changed its subscribed 
topic names or the member
             // has changed its subscribed topic regex to a regex that is 
already resolved. We avoid
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
index ce9d6b30843..14e4ccfa790 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
@@ -29,7 +29,7 @@ public record TargetAssignmentMetadata(int assignmentEpoch, 
long assignmentTimes
      * The initial target assignment metadata for groups.
      * This is different to tombstoned assignment metadata which has an 
assignment epoch of -1.
      */
-    public static final TargetAssignmentMetadata ZERO = new 
TargetAssignmentMetadata(0, 0L);
+    public static final TargetAssignmentMetadata INITIAL = new 
TargetAssignmentMetadata(1, 0L);
 
     public TargetAssignmentMetadata {
         if (assignmentEpoch < 0 && assignmentEpoch != -1) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 6e882b29d09..c054c8fbfe5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -131,11 +131,12 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
         this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
         this.groupId = Objects.requireNonNull(groupId);
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.groupEpoch.set(1);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
         this.metadataHash = new TimelineLong(snapshotRegistry);
         this.subscriptionType = new TimelineObject<>(snapshotRegistry, 
HOMOGENEOUS);
-        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.ZERO);
+        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.INITIAL);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.invertedTargetAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index f0d07611f86..5b6610fe4a0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -237,11 +237,12 @@ public class StreamsGroup implements Group {
         this.groupId = Objects.requireNonNull(groupId);
         this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.groupEpoch.set(1);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
         this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
         this.metadataHash = new TimelineLong(snapshotRegistry);
-        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.ZERO);
+        this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry, 
TargetAssignmentMetadata.INITIAL);
         this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
         this.currentActiveTaskToProcessId = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.currentStandbyTaskToProcessIds = new 
TimelineHashMap<>(snapshotRegistry, 0);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 837de8ce881..f0b61be4172 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -338,7 +338,7 @@ public class GroupMetadataManagerTest {
         assertEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
@@ -1252,7 +1252,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -1268,7 +1268,7 @@ public class GroupMetadataManagerTest {
 
         ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -1277,12 +1277,12 @@ public class GroupMetadataManagerTest {
             .setServerAssignorName("range")
             .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
-                mkTopicAssignment(barTopicId, 0, 1, 2)), 1))
+                mkTopicAssignment(barTopicId, 0, 1, 2)), 2))
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2, computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, metadataImage),
                 barTopicName, computeTopicHash(barTopicName, metadataImage)
             ))),
@@ -1290,7 +1290,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1337,7 +1337,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -1350,7 +1350,7 @@ public class GroupMetadataManagerTest {
 
         ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -1358,18 +1358,18 @@ public class GroupMetadataManagerTest {
             .setSubscribedTopicNames(List.of("foo"))
             .setServerAssignorName("range")
             .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 1))
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 2))
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, computeGroupHash(Map.of(
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2, computeGroupHash(Map.of(
                 fooTopicName, fooTopicHash
             ))),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
             )),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember)
         );
 
@@ -1395,7 +1395,7 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2, 0)
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
3, 0)
         );
         assertRecordsEquals(expectedRecords, result.records());
         assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -3118,7 +3118,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -3138,7 +3138,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000),
             result.response()
         );
@@ -3158,7 +3158,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -3183,7 +3183,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -4281,7 +4281,7 @@ public class GroupMetadataManagerTest {
                     .setRebalanceTimeoutMs(90000)
                     .setSubscribedTopicNames(List.of("foo"))
                     .setTopicPartitions(List.of()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify that there is a session time.
         context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4298,7 +4298,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberId(memberId)
                 .setMemberEpoch(result.response().memberEpoch()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify that there is a session time.
         context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4356,7 +4356,7 @@ public class GroupMetadataManagerTest {
                     .setRebalanceTimeoutMs(90000)
                     .setSubscribedTopicNames(List.of("foo"))
                     .setTopicPartitions(List.of()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify that there is a session time.
         context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4373,7 +4373,7 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
                     )
                 )
             )),
@@ -4482,7 +4482,7 @@ public class GroupMetadataManagerTest {
                     .setMemberId(memberId)
                     .setMemberEpoch(0)
                     .setSubscribedTopicNames(List.of("foo")));
-        assertEquals(1, result.response().getKey().memberEpoch());
+        assertEquals(2, result.response().getKey().memberEpoch());
 
         // Verify that there is a session time.
         context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4499,7 +4499,7 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0)
+                        
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0)
                     )
                 )
             )),
@@ -4609,7 +4609,7 @@ public class GroupMetadataManagerTest {
                     .setRebalanceTimeoutMs(90000)
                     .setSubscribedTopicNames(List.of("foo"))
                     .setTopicPartitions(List.of()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify that there is a session time.
         context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4642,7 +4642,7 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
                     )
                 )
             )),
@@ -4691,7 +4691,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -4729,7 +4729,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
-                .setMemberEpoch(2)
+                .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
@@ -4746,14 +4746,14 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setRebalanceTimeoutMs(12000)
                 .setSubscribedTopicNames(List.of("foo")));
 
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -4778,7 +4778,7 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setTopicPartitions(List.of(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
                     .setTopicId(fooTopicId)
                     .setPartitions(List.of(0, 1)))));
@@ -4786,7 +4786,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(2)
+                .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000),
             result.response()
         );
@@ -4838,7 +4838,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -4876,7 +4876,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
-                .setMemberEpoch(2)
+                .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
             result.response()
@@ -4893,12 +4893,12 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId1)
-                .setMemberEpoch(1));
+                .setMemberEpoch(2));
 
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -4920,7 +4920,7 @@ public class GroupMetadataManagerTest {
                         
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                         
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
-                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 
computeGroupHash(Map.of(
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 4, 
computeGroupHash(Map.of(
                             fooTopicName, computeTopicHash(fooTopicName, new 
KRaftCoordinatorMetadataImage(metadataImage))
                         )))
                     )
@@ -10284,7 +10284,8 @@ public class GroupMetadataManagerTest {
                 .setGroupEpoch(epoch)
                 .setGroupId(consumerGroupIds.get(0))
                 
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
-                .setAssignorName("range"),
+                .setAssignorName("range")
+                .setAssignmentEpoch(1),
             new ConsumerGroupDescribeResponseData.DescribedGroup()
                 .setGroupEpoch(epoch)
                 .setGroupId(consumerGroupIds.get(1))
@@ -10296,6 +10297,7 @@ public class GroupMetadataManagerTest {
                 ))
                 
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
                 .setAssignorName("assignorName")
+                .setAssignmentEpoch(1)
         );
         List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(consumerGroupIds);
 
@@ -10351,6 +10353,7 @@ public class GroupMetadataManagerTest {
         ConsumerGroupMember.Builder memberBuilder2 = new 
ConsumerGroupMember.Builder(memberId2);
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId,
 memberBuilder2.build()));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId,
 memberId2, assignmentMap));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(consumerGroupId,
 epoch + 1, 12345L));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId,
 memberBuilder2.build()));
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId,
 epoch + 2, 0));
 
@@ -10376,7 +10379,8 @@ public class GroupMetadataManagerTest {
             ))
             
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
             .setAssignorName("range")
-            .setGroupEpoch(epoch + 2);
+            .setGroupEpoch(epoch + 2)
+            .setAssignmentEpoch(epoch + 1);
         expected = List.of(
             describedGroup
         );
@@ -10428,7 +10432,7 @@ public class GroupMetadataManagerTest {
                 .setGroupEpoch(epoch)
                 .setGroupId(streamsGroupIds.get(0))
                 .setGroupState(StreamsGroupState.EMPTY.toString())
-                .setAssignmentEpoch(0)
+                .setAssignmentEpoch(1)
                 .setTopology(expectedTopology),
             new StreamsGroupDescribeResponseData.DescribedGroup()
                 .setGroupEpoch(epoch)
@@ -10440,6 +10444,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setTopology(expectedTopology)
                 .setGroupState(StreamsGroupState.NOT_READY.toString())
+                .setAssignmentEpoch(1)
         );
         List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(streamsGroupIds);
 
@@ -10496,6 +10501,7 @@ public class GroupMetadataManagerTest {
         StreamsGroupMember.Builder memberBuilder2 = 
streamsGroupMemberBuilderWithDefaults(memberId2);
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
 memberBuilder2.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
 memberId2, assignment));
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(streamsGroupId,
 epoch + 1, 12345L));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
 memberBuilder2.build()));
         
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
 epoch + 2, 0, 0, Map.of()));
 
@@ -10529,7 +10535,8 @@ public class GroupMetadataManagerTest {
                     )
             )
             .setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
-            .setGroupEpoch(epoch + 2);
+            .setGroupEpoch(epoch + 2)
+            .setAssignmentEpoch(epoch + 1);
         assertEquals(1, actual.size());
         assertEquals(describedGroup, actual.get(0));
     }
@@ -11424,7 +11431,7 @@ public class GroupMetadataManagerTest {
 
         ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .setRebalanceTimeoutMs(5000)
             .setClientId(DEFAULT_CLIENT_ID)
@@ -11439,9 +11446,9 @@ public class GroupMetadataManagerTest {
             List.of(
                 
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId,
 expectedMember),
-                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 2, 0),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId,
 memberId, Map.of()),
-                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
 1, context.time.milliseconds()),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
 2, context.time.milliseconds()),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
             ),
             result.records()
@@ -16607,6 +16614,7 @@ public class GroupMetadataManagerTest {
         List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of(
             new ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupEpoch(100)
+                .setAssignmentEpoch(1)
                 .setGroupId(groupIds.get(0))
                 .setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
                 .setAssignorName("share-range"),
@@ -16675,7 +16683,7 @@ public class GroupMetadataManagerTest {
                 Set.of(0)
             ),
             groupId,
-            1,
+            2,
             true
         );
 
@@ -16691,7 +16699,7 @@ public class GroupMetadataManagerTest {
         assertEquals(
             new ShareGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new 
ShareGroupHeartbeatResponseData.Assignment()),
             result.response().getKey()
@@ -16755,7 +16763,7 @@ public class GroupMetadataManagerTest {
                 Set.of(0)
             ),
             groupId,
-            1,
+            2,
             true
         );
 
@@ -16829,14 +16837,14 @@ public class GroupMetadataManagerTest {
                 Set.of(0, 1, 2)
             ),
             groupId,
-            1,
+            2,
             true
         );
 
         assertResponseEquals(
             new ShareGroupHeartbeatResponseData()
                 .setMemberId(memberId)
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
                 .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
                     .setTopicPartitions(List.of(
@@ -16853,7 +16861,7 @@ public class GroupMetadataManagerTest {
         ShareGroupMember expectedMember = new 
ShareGroupMember.Builder(memberId)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .setSubscribedTopicNames(List.of("foo", "bar"))
             .setAssignedPartitions(mkAssignment(
@@ -16864,7 +16872,7 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, 
expectedMember),
-            GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1, 
computeGroupHash(Map.of(
+            GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 
computeGroupHash(Map.of(
                 fooTopicName, computeTopicHash(fooTopicName, 
coordinatorMetadataImage),
                 barTopicName, computeTopicHash(barTopicName, 
coordinatorMetadataImage)
             ))),
@@ -16872,7 +16880,7 @@ public class GroupMetadataManagerTest {
                 mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
                 mkTopicAssignment(barTopicId, 0, 1, 2)
             )),
-            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
+            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId,
 mkShareGroupStateMap(List.of(
                     mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName, 
List.of(0, 1, 2, 3, 4, 5)),
@@ -20628,7 +20636,7 @@ public class GroupMetadataManagerTest {
                     .setRebalanceTimeoutMs(90000)
                     .setSubscribedTopicNames(List.of("foo"))
                     .setTopicPartitions(List.of()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify heartbeat interval
         assertEquals(5000, result.response().heartbeatIntervalMs());
@@ -20654,7 +20662,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberId(memberId)
                 .setMemberEpoch(result.response().memberEpoch()));
-        assertEquals(1, result.response().memberEpoch());
+        assertEquals(2, result.response().memberEpoch());
 
         // Verify heartbeat interval
         assertEquals(10000, result.response().heartbeatIntervalMs());
@@ -20719,7 +20727,7 @@ public class GroupMetadataManagerTest {
                     .setMemberId(memberId)
                     .setMemberEpoch(0)
                     .setSubscribedTopicNames(List.of("foo")));
-        assertEquals(1, result.response().getKey().memberEpoch());
+        assertEquals(2, result.response().getKey().memberEpoch());
 
         verifyShareGroupHeartbeatInitializeRequest(
             result.response().getValue(),
@@ -20728,7 +20736,7 @@ public class GroupMetadataManagerTest {
                 Set.of(0, 1, 2, 3, 4, 5)
             ),
             groupId,
-            1,
+            2,
             true
         );
 
@@ -20769,7 +20777,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberId(memberId)
                 .setMemberEpoch(result.response().getKey().memberEpoch()));
-        assertEquals(1, result.response().getKey().memberEpoch());
+        assertEquals(2, result.response().getKey().memberEpoch());
 
         verifyShareGroupHeartbeatInitializeRequest(
             result.response().getValue(),
@@ -22858,11 +22866,13 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             // The member subscription is created.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
-            // The group epoch is bumped.
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1, 0),
-            // The target assignment is created.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, Map.of()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 1, context.time.milliseconds()),
+
+            // The group is initialized at group epoch 1. Since the group 
epoch is not bumped until
+            // regex resolution has completed, no consumer group metadata 
record is created.
+            // Similarly, the target assignment is initialized at epoch 1 with 
an empty assignment
+            // and not updated until regex resolution has completed, so no 
target assignment records
+            // are created.
+
             // The member current state is created.
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
         );
@@ -25302,7 +25312,7 @@ public class GroupMetadataManagerTest {
                 Set.of(0, 1)
             ),
             groupId,
-            1,
+            2,
             true
         );
 
@@ -25350,7 +25360,7 @@ public class GroupMetadataManagerTest {
             new ShareGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId.toString())
-                .setMemberEpoch(1)
+                .setMemberEpoch(2)
                 .setSubscribedTopicNames(null));
 
         expected = newShareGroupStatePartitionMetadataRecord(
@@ -25377,7 +25387,7 @@ public class GroupMetadataManagerTest {
                 Set.of(2, 3)
             ),
             groupId,
-            2,
+            3,
             true
         );
 
@@ -25411,7 +25421,7 @@ public class GroupMetadataManagerTest {
             new ShareGroupMetadataKey()
                 .setGroupId(groupId),
             new ShareGroupMetadataValue()
-                .setEpoch(0)
+                .setEpoch(1)
         );
         context.groupMetadataManager.replay(
             new ShareGroupStatePartitionMetadataKey()
@@ -25448,7 +25458,7 @@ public class GroupMetadataManagerTest {
             result.response().getValue(),
             Map.of(t1Uuid, Set.of(0, 1)),
             groupId,
-            1,
+            2,
             false
         );
 
@@ -25490,7 +25500,7 @@ public class GroupMetadataManagerTest {
             result.response().getValue(),
             Map.of(t1Uuid, Set.of(0, 1)),
             groupId,
-            2,
+            3,
             true
         );
         verify(context.metrics, 
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index 40d3300b093..196de8bb70e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.modern.consumer;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 
 import java.util.ArrayList;
@@ -40,8 +41,8 @@ public class ConsumerGroupBuilder {
     public ConsumerGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
-        this.assignmentEpoch = 0;
-        this.assignmentTimestamp = 0L;
+        this.assignmentEpoch = 
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+        this.assignmentTimestamp = 
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
     }
 
     public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 825211d72fa..4570315a00d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -454,36 +454,36 @@ public class ConsumerGroupTest {
 
         ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .build();
 
         consumerGroup.updateMember(member1);
-        consumerGroup.setGroupEpoch(1);
+        consumerGroup.setGroupEpoch(2);
 
         assertEquals(MemberState.STABLE, member1.state());
         assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
 
         ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .build();
 
         consumerGroup.updateMember(member2);
-        consumerGroup.setGroupEpoch(2);
+        consumerGroup.setGroupEpoch(3);
 
         assertEquals(MemberState.STABLE, member2.state());
         assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
 
-        consumerGroup.setTargetAssignmentMetadata(2, 12345L);
+        consumerGroup.setTargetAssignmentMetadata(3, 12345L);
 
         assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, 
consumerGroup.state());
 
         member1 = new ConsumerGroupMember.Builder(member1)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         consumerGroup.updateMember(member1);
@@ -494,8 +494,8 @@ public class ConsumerGroupTest {
         // Member 2 is not stable so the group stays in reconciling state.
         member2 = new ConsumerGroupMember.Builder(member2)
             .setState(MemberState.UNREVOKED_PARTITIONS)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         consumerGroup.updateMember(member2);
@@ -505,8 +505,8 @@ public class ConsumerGroupTest {
 
         member2 = new ConsumerGroupMember.Builder(member2)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         consumerGroup.updateMember(member2);
@@ -807,8 +807,8 @@ public class ConsumerGroupTest {
         MockTime time = new MockTime();
         ConsumerGroup group = createConsumerGroup("group-foo");
 
-        // Group epoch starts at 0.
-        assertEquals(0, group.groupEpoch());
+        // Group epoch starts at 1.
+        assertEquals(1, group.groupEpoch());
 
         // The refresh time deadline should be empty when the group is created 
or loaded.
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
@@ -941,6 +941,7 @@ public class ConsumerGroupTest {
         snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
         group.updateMember(new ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(1)
             .setSubscribedTopicNames(List.of("foo"))
             .build());
         snapshotRegistry.idempotentCreateSnapshot(1);
@@ -988,7 +989,7 @@ public class ConsumerGroupTest {
         assertDoesNotThrow(consumerGroup::validateDeleteGroup);
 
         ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .build();
         consumerGroup.updateMember(member1);
@@ -996,12 +997,12 @@ public class ConsumerGroupTest {
         assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
 
-        consumerGroup.setGroupEpoch(1);
+        consumerGroup.setGroupEpoch(2);
 
         assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
 
-        consumerGroup.setTargetAssignmentMetadata(1, 12345L);
+        consumerGroup.setTargetAssignmentMetadata(2, 12345L);
 
         assertEquals(ConsumerGroup.ConsumerGroupState.STABLE, 
consumerGroup.state());
         assertThrows(GroupNotEmptyException.class, 
consumerGroup::validateDeleteGroup);
@@ -1055,26 +1056,31 @@ public class ConsumerGroupTest {
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
 
         group.updateMember(new ConsumerGroupMember.Builder("member1")
+                .setMemberEpoch(1)
                 .setSubscribedTopicNames(List.of("foo"))
                 .setServerAssignorName("assignorName")
                 .build());
         group.updateMember(new ConsumerGroupMember.Builder("member2")
+                .setMemberEpoch(1)
                 .build());
         snapshotRegistry.idempotentCreateSnapshot(1);
 
         ConsumerGroupDescribeResponseData.DescribedGroup expected = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId("group-id-1")
             .setGroupState(ConsumerGroup.ConsumerGroupState.STABLE.toString())
-            .setGroupEpoch(0)
-            .setAssignmentEpoch(0)
+            .setGroupEpoch(1)
+            .setAssignmentEpoch(1)
             .setAssignorName("assignorName")
             .setMembers(Arrays.asList(
                 new ConsumerGroupDescribeResponseData.Member()
                     .setMemberId("member1")
+                    .setMemberEpoch(1)
                     .setSubscribedTopicNames(List.of("foo"))
                     .setSubscribedTopicRegex("")
                     .setMemberType((byte) 1),
-                new 
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
+                new ConsumerGroupDescribeResponseData.Member()
+                    .setMemberId("member2")
+                    .setMemberEpoch(1)
                     .setSubscribedTopicRegex("")
                     .setMemberType((byte) 1)
             ));
@@ -1093,6 +1099,7 @@ public class ConsumerGroupTest {
         assertFalse(group.isInStates(Set.of("Empty"), 0));
 
         group.updateMember(new ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(1)
             .setSubscribedTopicNames(List.of("foo"))
             .build());
         snapshotRegistry.idempotentCreateSnapshot(1);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index 14d6442057c..e853b7d7ef0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.modern.share;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 
 import java.util.ArrayList;
@@ -39,8 +40,8 @@ public class ShareGroupBuilder {
     public ShareGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
-        this.assignmentEpoch = 0;
-        this.assignmentTimestamp = 0L;
+        this.assignmentEpoch = 
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+        this.assignmentTimestamp = 
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
     }
 
     public ShareGroupBuilder withMember(ShareGroupMember member) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 60daaa5d1bc..1d3505cf1d3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -297,8 +297,8 @@ public class ShareGroupTest {
         MockTime time = new MockTime();
         ShareGroup shareGroup = createShareGroup("group-foo");
 
-        // Group epoch starts at 0.
-        assertEquals(0, shareGroup.groupEpoch());
+        // Group epoch starts at 1.
+        assertEquals(1, shareGroup.groupEpoch());
 
         // The refresh time deadline should be empty when the group is created 
or loaded.
         assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
@@ -453,8 +453,8 @@ public class ShareGroupTest {
         ShareGroupDescribeResponseData.DescribedGroup expected = new 
ShareGroupDescribeResponseData.DescribedGroup()
             .setGroupId("group-id-1")
             .setGroupState(ShareGroupState.STABLE.toString())
-            .setGroupEpoch(0)
-            .setAssignmentEpoch(0)
+            .setGroupEpoch(1)
+            .setAssignmentEpoch(1)
             .setAssignorName("assignorName")
             .setMembers(Arrays.asList(
                 new ShareGroupDescribeResponseData.Member()
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index 5d3688693be..9d16e2501a3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
 
@@ -42,8 +43,8 @@ public class StreamsGroupBuilder {
     public StreamsGroupBuilder(String groupId, int groupEpoch) {
         this.groupId = groupId;
         this.groupEpoch = groupEpoch;
-        this.targetAssignmentEpoch = 0;
-        this.targetAssignmentTimestamp = 0L;
+        this.targetAssignmentEpoch = 
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+        this.targetAssignmentTimestamp = 
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
         this.topology = null;
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index d52c5e631f1..c5fc85ea7d3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -489,12 +489,12 @@ public class StreamsGroupTest {
 
         StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .build();
 
         streamsGroup.updateMember(member1);
-        streamsGroup.setGroupEpoch(1);
+        streamsGroup.setGroupEpoch(2);
 
         assertEquals(MemberState.STABLE, member1.state());
         assertEquals(StreamsGroup.StreamsGroupState.NOT_READY, 
streamsGroup.state());
@@ -508,24 +508,24 @@ public class StreamsGroupTest {
 
         StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
             .setState(MemberState.STABLE)
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .build();
 
         streamsGroup.updateMember(member2);
-        streamsGroup.setGroupEpoch(2);
+        streamsGroup.setGroupEpoch(3);
 
         assertEquals(MemberState.STABLE, member2.state());
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
 
-        streamsGroup.setTargetAssignmentMetadata(2, 12345L);
+        streamsGroup.setTargetAssignmentMetadata(3, 12345L);
 
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
 
         member1 = new StreamsGroupMember.Builder(member1)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         streamsGroup.updateMember(member1);
@@ -536,8 +536,8 @@ public class StreamsGroupTest {
         // Member 2 is not stable so the group stays in reconciling state.
         member2 = new StreamsGroupMember.Builder(member2)
             .setState(MemberState.UNREVOKED_TASKS)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         streamsGroup.updateMember(member2);
@@ -547,8 +547,8 @@ public class StreamsGroupTest {
 
         member2 = new StreamsGroupMember.Builder(member2)
             .setState(MemberState.STABLE)
-            .setMemberEpoch(2)
-            .setPreviousMemberEpoch(1)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
             .build();
 
         streamsGroup.updateMember(member2);
@@ -567,8 +567,8 @@ public class StreamsGroupTest {
         MockTime time = new MockTime();
         StreamsGroup group = createStreamsGroup("group-foo");
 
-        // Group epoch starts at 0.
-        assertEquals(0, group.groupEpoch());
+        // Group epoch starts at 1.
+        assertEquals(1, group.groupEpoch());
 
         // The refresh time deadline should be empty when the group is created 
or loaded.
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
@@ -856,7 +856,7 @@ public class StreamsGroupTest {
         assertDoesNotThrow(streamsGroup::validateDeleteGroup);
 
         StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
-            .setMemberEpoch(1)
+            .setMemberEpoch(2)
             .setPreviousMemberEpoch(0)
             .setState(MemberState.STABLE)
             .build();
@@ -872,12 +872,12 @@ public class StreamsGroupTest {
         assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
 
-        streamsGroup.setGroupEpoch(1);
+        streamsGroup.setGroupEpoch(2);
 
         assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
 
-        streamsGroup.setTargetAssignmentMetadata(1, 12345L);
+        streamsGroup.setTargetAssignmentMetadata(2, 12345L);
 
         assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
         assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);

Reply via email to