This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c8e3c58da6 KAFKA-20268: Initialize group epochs to 1 for assignment
offload (#21700)
5c8e3c58da6 is described below
commit 5c8e3c58da66ae95036781c7d992f51c3f6152c2
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 11 08:42:52 2026 +0000
KAFKA-20268: Initialize group epochs to 1 for assignment offload (#21700)
Once assignment offload is implemented, newly created groups may not
have their first assignment at epoch 1 available yet. We cannot give the
member an epoch of 0, since it is reserved for new members joining a
group. The new member's epoch must be greater than 0 and less than the
group epoch.
To resolve this, we initialize groups at epoch 1 with target assignment
epoch 1. Epoch 1 contains an empty assignment and the first computed
assignment will have epoch 2. Members joining a new group will reconcile
towards epoch 1 while waiting for epoch 2's assignment.
Streams groups already start group epochs at 2. We update streams groups
to use the same approach as consumer and share groups.
Reviewers: Lucas Brutschy <[email protected]>, David Jacot
<[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 4 +-
.../server/ConsumerGroupDescribeRequestTest.scala | 4 +-
.../server/ConsumerGroupHeartbeatRequestTest.scala | 30 ++---
.../server/ConsumerProtocolMigrationTest.scala | 6 +-
.../server/ShareGroupDescribeRequestTest.scala | 4 +-
.../server/ShareGroupHeartbeatRequestTest.scala | 60 ++++-----
.../kafka/server/TxnOffsetCommitRequestTest.scala | 2 +-
.../kafka/server/WriteTxnMarkersRequestTest.scala | 2 +-
.../coordinator/group/GroupMetadataManager.java | 9 +-
.../group/TargetAssignmentMetadata.java | 2 +-
.../coordinator/group/modern/ModernGroup.java | 3 +-
.../coordinator/group/streams/StreamsGroup.java | 3 +-
.../group/GroupMetadataManagerTest.java | 142 +++++++++++----------
.../modern/consumer/ConsumerGroupBuilder.java | 5 +-
.../group/modern/consumer/ConsumerGroupTest.java | 45 ++++---
.../group/modern/share/ShareGroupBuilder.java | 5 +-
.../group/modern/share/ShareGroupTest.java | 8 +-
.../group/streams/StreamsGroupBuilder.java | 5 +-
.../group/streams/StreamsGroupTest.java | 32 ++---
19 files changed, 193 insertions(+), 178 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 327b5ad2b36..a9c23a0bc08 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2069,8 +2069,8 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertTrue(testGroupDescription.groupEpoch.isEmpty)
assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
} else {
- assertEquals(Optional.of(3), testGroupDescription.groupEpoch)
- assertEquals(Optional.of(3),
testGroupDescription.targetAssignmentEpoch)
+ assertEquals(Optional.of(4), testGroupDescription.groupEpoch)
+ assertEquals(Optional.of(4),
testGroupDescription.targetAssignmentEpoch)
}
assertEquals(testGroupId, testGroupDescription.groupId())
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 17057345c5b..156969f4ece 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -134,8 +134,8 @@ class ConsumerGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCo
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
- .setGroupEpoch(1)
- .setAssignmentEpoch(1)
+ .setGroupEpoch(2)
+ .setAssignmentEpoch(2)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 6707ee94b7f..a5e4eb42dca 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -100,7 +100,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -132,7 +132,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
}, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
// Leave the group.
@@ -382,7 +382,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -416,7 +416,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
@@ -451,7 +451,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
// The 2 member IDs should be different
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
@@ -493,7 +493,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(),
consumerGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -526,7 +526,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
}, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
// A new static member tries to join the group with an inuse instanceid.
@@ -554,8 +554,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response
$consumerGroupHeartbeatResponse.")
- // Verify the response. The group epoch bumps upto 4 which eventually
reflects in the new member epoch.
- assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
+ // Verify the response. The group epoch bumps upto 5 which eventually
reflects in the new member epoch.
+ assertEquals(5, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
}
@@ -598,7 +598,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5000,
consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)
// Alter consumer heartbeat interval config
@@ -706,7 +706,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify initial join success.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
- assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
// Create the topic to trigger partition assignment.
val topicId = createTopic(
@@ -735,8 +735,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response
$consumerGroupHeartbeatResponse.")
- // Verify member has epoch > 0 (should be 2).
- assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
+ // Verify member has epoch > 0 (should be 3).
+ assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
consumerGroupHeartbeatResponse.data.assignment)
// Simulate a fenced member attempting to rejoin with epoch=0.
@@ -754,11 +754,11 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify the full response.
// Since the subscription/metadata hasn't changed, the member should get
- // their current state back with the same epoch (2) and assignment.
+ // their current state back with the same epoch (3) and assignment.
val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code)
.setMemberId(memberId)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
.setAssignment(expectedAssignment)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 1b1dec69eaf..3e86e605d2d 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -299,14 +299,14 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
syncGroupWithOldProtocol(
groupId = groupId,
memberId = memberId2,
- generationId = 2
+ generationId = 3
)
)
// Member 2 heartbeats.
heartbeat(
groupId = groupId,
- generationId = 2,
+ generationId = 3,
memberId = memberId2
)
@@ -323,7 +323,7 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
// Member 2 heartbeats and gets REBALANCE_IN_PROGRESS.
heartbeat(
groupId = groupId,
- generationId = 2,
+ generationId = 3,
memberId = memberId2,
expectedError = Errors.REBALANCE_IN_PROGRESS
)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
index 408f31db8d1..16278cc627e 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
@@ -108,8 +108,8 @@ class ShareGroupDescribeRequestTest(cluster:
ClusterInstance) extends GroupCoord
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(GroupState.STABLE.toString)
- .setGroupEpoch(1)
- .setAssignmentEpoch(1)
+ .setGroupEpoch(2)
+ .setAssignmentEpoch(2)
.setAssignorName("simple")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index f2993ecac99..11bf42b3f6c 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -91,7 +91,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response.
assertNotNull(shareGroupHeartbeatResponse.data.memberId)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -124,7 +124,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
shareGroupHeartbeatResponse.data.assignment)
// Leave the group.
@@ -182,7 +182,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response for member 1.
val memberId1 = shareGroupHeartbeatResponse.data.memberId
assertNotNull(memberId1)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// The second member request to join the group.
@@ -203,7 +203,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response for member 2.
val memberId2 = shareGroupHeartbeatResponse.data.memberId
assertNotNull(memberId2)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Verify the member id is different.
assertNotEquals(memberId1, memberId2)
@@ -220,7 +220,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
).build()
// Heartbeats until the partitions are assigned for member 1.
@@ -243,14 +243,14 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
val topicPartitionsAssignedToMember1 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
// Prepare the next heartbeat for member 2.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId2)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
).build()
// Heartbeats until the partitions are assigned for member 2.
@@ -262,7 +262,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
val topicPartitionsAssignedToMember2 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
topicPartitionsAssignedToMember1.forEach(topicPartition => {
@@ -280,7 +280,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId1)
- .setMemberEpoch(3)
+ .setMemberEpoch(4)
).build()
// Heartbeats until the response for no change of assignment occurs for
member 1 with same epoch.
@@ -292,7 +292,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
@@ -336,7 +336,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
assertNotNull(memberId)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -357,7 +357,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
).build()
TestUtils.waitUntilTrue(() => {
@@ -367,7 +367,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
// Member leaves the group.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@@ -398,7 +398,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
// Verify the response for member 1.
- assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
// Partition assignment remains intact on rejoining.
assertEquals(expectedAssignment,
shareGroupHeartbeatResponse.data.assignment)
@@ -441,7 +441,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
assertNotNull(memberId)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic foo.
val fooTopicId = TestUtils.createTopicWithAdminRaw(
@@ -469,7 +469,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
).build()
cluster.waitTopicCreation("foo", 2)
@@ -483,7 +483,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic foo and bar assigned. Last
response $shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
// Create the topic baz.
val bazTopicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
@@ -507,7 +507,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(3)
+ .setMemberEpoch(4)
).build()
TestUtils.waitUntilTrue(() => {
@@ -518,7 +518,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic baz assigned. Last
response $shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
// Increasing the partitions of topic bar which is already being
consumed in the share group.
increasePartitions(admin, "bar", 6)
@@ -538,7 +538,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(5)
+ .setMemberEpoch(6)
).build()
TestUtils.waitUntilTrue(() => {
@@ -549,7 +549,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic bar. Last
response $shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
// Delete the topic foo.
TestUtils.deleteTopicWithAdmin(
admin = admin,
@@ -571,7 +571,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(7)
+ .setMemberEpoch(8)
).build()
TestUtils.waitUntilTrue(() => {
@@ -582,7 +582,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic foo. Last
response $shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(9, shareGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
@@ -705,7 +705,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
val memberId = shareGroupHeartbeatResponse.data.memberId
var memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId)
- assertEquals(1, memberEpoch)
+ assertEquals(2, memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -875,7 +875,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
assertNotNull(memberId)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
val fooId = TestUtils.createTopicWithAdminRaw(
@@ -893,7 +893,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
).build()
TestUtils.waitUntilTrue(() => {
@@ -902,7 +902,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
// Restart the only running broker.
val broker = cluster.brokers().values().iterator().next()
@@ -914,7 +914,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
).build()
// Should receive no error and no assignment changes.
@@ -925,7 +925,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response. Epoch should not have changed and null
assignments determines that no
// change in old assignment.
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
assertNull(shareGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
@@ -968,7 +968,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify initial join success.
assertNotNull(shareGroupHeartbeatResponse.data.memberId)
- assertEquals(1, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
// Create the topic to trigger partition assignment.
val topicId = TestUtils.createTopicWithAdminRaw(
diff --git
a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
index aef40390d85..cc6cd2c957e 100644
--- a/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TxnOffsetCommitRequestTest.scala
@@ -295,7 +295,7 @@ class TxnOffsetCommitRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
commitTxnOffset(
groupId = groupId,
memberId = if (version >= 3) memberId else
JoinGroupRequest.UNKNOWN_MEMBER_ID,
- generationId = if (version >= 3) 1 else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ generationId = if (version >= 3) memberEpoch else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
producerId = producerIdAndEpoch.producerId,
producerEpoch = producerIdAndEpoch.epoch,
transactionalId = transactionalId,
diff --git
a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
index a68de4dacc0..a476271c04c 100644
--- a/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/WriteTxnMarkersRequestTest.scala
@@ -119,7 +119,7 @@ class WriteTxnMarkersRequestTest(cluster:ClusterInstance)
extends GroupCoordinat
commitTxnOffset(
groupId = groupId,
memberId = if (version >= 3) memberId else
JoinGroupRequest.UNKNOWN_MEMBER_ID,
- generationId = if (version >= 3) 1 else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ generationId = if (version >= 3) memberEpoch else
JoinGroupRequest.UNKNOWN_GENERATION_ID,
producerId = producerIdAndEpoch.producerId,
producerEpoch = if (useTV2) (producerIdAndEpoch.epoch + 1).toShort
else producerIdAndEpoch.epoch,
transactionalId = transactionalId,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 694097f3ffa..3d7ecdbfe2c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2062,11 +2062,7 @@ public class GroupMetadataManager {
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
- if (groupEpoch == 0) {
- groupEpoch = 2;
- } else {
- groupEpoch += 1;
- }
+ groupEpoch += 1;
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
@@ -2390,9 +2386,6 @@ public class GroupMetadataManager {
SubscriptionType subscriptionType = group.subscriptionType();
boolean bumpGroupEpoch =
- // If the group is newly created, we must ensure that it moves
away from
- // epoch 0 and that it is fully initialized.
- groupEpoch == 0 ||
// Bumping the group epoch signals that the target assignment
should be updated. We bump
// the group epoch when the member has changed its subscribed
topic names or the member
// has changed its subscribed topic regex to a regex that is
already resolved. We avoid
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
index ce9d6b30843..14e4ccfa790 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/TargetAssignmentMetadata.java
@@ -29,7 +29,7 @@ public record TargetAssignmentMetadata(int assignmentEpoch,
long assignmentTimes
* The initial target assignment metadata for groups.
* This is different to tombstoned assignment metadata which has an
assignment epoch of -1.
*/
- public static final TargetAssignmentMetadata ZERO = new
TargetAssignmentMetadata(0, 0L);
+ public static final TargetAssignmentMetadata INITIAL = new
TargetAssignmentMetadata(1, 0L);
public TargetAssignmentMetadata {
if (assignmentEpoch < 0 && assignmentEpoch != -1) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 6e882b29d09..c054c8fbfe5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -131,11 +131,12 @@ public abstract class ModernGroup<T extends
ModernGroupMember> implements Group
this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
this.groupId = Objects.requireNonNull(groupId);
this.groupEpoch = new TimelineInteger(snapshotRegistry);
+ this.groupEpoch.set(1);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.subscriptionType = new TimelineObject<>(snapshotRegistry,
HOMOGENEOUS);
- this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.ZERO);
+ this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.INITIAL);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.invertedTargetAssignment = new
TimelineHashMap<>(snapshotRegistry, 0);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index f0d07611f86..5b6610fe4a0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -237,11 +237,12 @@ public class StreamsGroup implements Group {
this.groupId = Objects.requireNonNull(groupId);
this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
this.groupEpoch = new TimelineInteger(snapshotRegistry);
+ this.groupEpoch.set(1);
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
this.metadataHash = new TimelineLong(snapshotRegistry);
- this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.ZERO);
+ this.targetAssignmentMetadata = new TimelineObject<>(snapshotRegistry,
TargetAssignmentMetadata.INITIAL);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
this.currentActiveTaskToProcessId = new
TimelineHashMap<>(snapshotRegistry, 0);
this.currentStandbyTaskToProcessIds = new
TimelineHashMap<>(snapshotRegistry, 0);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 837de8ce881..f0b61be4172 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -338,7 +338,7 @@ public class GroupMetadataManagerTest {
assertEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
@@ -1252,7 +1252,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -1268,7 +1268,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -1277,12 +1277,12 @@ public class GroupMetadataManagerTest {
.setServerAssignorName("range")
.setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
- mkTopicAssignment(barTopicId, 0, 1, 2)), 1))
+ mkTopicAssignment(barTopicId, 0, 1, 2)), 2))
.build();
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
2, computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
))),
@@ -1290,7 +1290,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1337,7 +1337,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -1350,7 +1350,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
@@ -1358,18 +1358,18 @@ public class GroupMetadataManagerTest {
.setSubscribedTopicNames(List.of("foo"))
.setServerAssignorName("range")
.setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
- mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 1))
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 2))
.build();
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, computeGroupHash(Map.of(
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
2, computeGroupHash(Map.of(
fooTopicName, fooTopicHash
))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
)),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember)
);
@@ -1395,7 +1395,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
2, 0)
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
3, 0)
);
assertRecordsEquals(expectedRecords, result.records());
assertEquals(Map.of(), context.groupMetadataManager.topicHashCache());
@@ -3118,7 +3118,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -3138,7 +3138,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000),
result.response()
);
@@ -3158,7 +3158,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -3183,7 +3183,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -4281,7 +4281,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(List.of("foo"))
.setTopicPartitions(List.of()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4298,7 +4298,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4356,7 +4356,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(List.of("foo"))
.setTopicPartitions(List.of()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4373,7 +4373,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
)
)
)),
@@ -4482,7 +4482,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo")));
- assertEquals(1, result.response().getKey().memberEpoch());
+ assertEquals(2, result.response().getKey().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4499,7 +4499,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2, 0)
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3, 0)
)
)
)),
@@ -4609,7 +4609,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(List.of("foo"))
.setTopicPartitions(List.of()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@@ -4642,7 +4642,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0)
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0)
)
)
)),
@@ -4691,7 +4691,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -4729,7 +4729,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
@@ -4746,14 +4746,14 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setRebalanceTimeoutMs(12000)
.setSubscribedTopicNames(List.of("foo")));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -4778,7 +4778,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setTopicPartitions(List.of(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(0, 1)))));
@@ -4786,7 +4786,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000),
result.response()
);
@@ -4838,7 +4838,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -4876,7 +4876,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(2)
+ .setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()),
result.response()
@@ -4893,12 +4893,12 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
- .setMemberEpoch(1));
+ .setMemberEpoch(2));
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -4920,7 +4920,7 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3,
computeGroupHash(Map.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 4,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, new
KRaftCoordinatorMetadataImage(metadataImage))
)))
)
@@ -10284,7 +10284,8 @@ public class GroupMetadataManagerTest {
.setGroupEpoch(epoch)
.setGroupId(consumerGroupIds.get(0))
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
- .setAssignorName("range"),
+ .setAssignorName("range")
+ .setAssignmentEpoch(1),
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
.setGroupId(consumerGroupIds.get(1))
@@ -10296,6 +10297,7 @@ public class GroupMetadataManagerTest {
))
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
.setAssignorName("assignorName")
+ .setAssignmentEpoch(1)
);
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual =
context.sendConsumerGroupDescribe(consumerGroupIds);
@@ -10351,6 +10353,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember.Builder memberBuilder2 = new
ConsumerGroupMember.Builder(memberId2);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(consumerGroupId,
memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(consumerGroupId,
memberId2, assignmentMap));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(consumerGroupId,
epoch + 1, 12345L));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(consumerGroupId,
memberBuilder2.build()));
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(consumerGroupId,
epoch + 2, 0));
@@ -10376,7 +10379,8 @@ public class GroupMetadataManagerTest {
))
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
.setAssignorName("range")
- .setGroupEpoch(epoch + 2);
+ .setGroupEpoch(epoch + 2)
+ .setAssignmentEpoch(epoch + 1);
expected = List.of(
describedGroup
);
@@ -10428,7 +10432,7 @@ public class GroupMetadataManagerTest {
.setGroupEpoch(epoch)
.setGroupId(streamsGroupIds.get(0))
.setGroupState(StreamsGroupState.EMPTY.toString())
- .setAssignmentEpoch(0)
+ .setAssignmentEpoch(1)
.setTopology(expectedTopology),
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(epoch)
@@ -10440,6 +10444,7 @@ public class GroupMetadataManagerTest {
))
.setTopology(expectedTopology)
.setGroupState(StreamsGroupState.NOT_READY.toString())
+ .setAssignmentEpoch(1)
);
List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.sendStreamsGroupDescribe(streamsGroupIds);
@@ -10496,6 +10501,7 @@ public class GroupMetadataManagerTest {
StreamsGroupMember.Builder memberBuilder2 =
streamsGroupMemberBuilderWithDefaults(memberId2);
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(streamsGroupId,
memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(streamsGroupId,
memberId2, assignment));
+
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(streamsGroupId,
epoch + 1, 12345L));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(streamsGroupId,
memberBuilder2.build()));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(streamsGroupId,
epoch + 2, 0, 0, Map.of()));
@@ -10529,7 +10535,8 @@ public class GroupMetadataManagerTest {
)
)
.setGroupState(StreamsGroup.StreamsGroupState.ASSIGNING.toString())
- .setGroupEpoch(epoch + 2);
+ .setGroupEpoch(epoch + 2)
+ .setAssignmentEpoch(epoch + 1);
assertEquals(1, actual.size());
assertEquals(describedGroup, actual.get(0));
}
@@ -11424,7 +11431,7 @@ public class GroupMetadataManagerTest {
ConsumerGroupMember expectedMember = new
ConsumerGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setRebalanceTimeoutMs(5000)
.setClientId(DEFAULT_CLIENT_ID)
@@ -11439,9 +11446,9 @@ public class GroupMetadataManagerTest {
List.of(
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(classicGroupId,
expectedMember),
-
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 1, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(classicGroupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(classicGroupId,
memberId, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
1, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(classicGroupId,
2, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
),
result.records()
@@ -16607,6 +16614,7 @@ public class GroupMetadataManagerTest {
List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(100)
+ .setAssignmentEpoch(1)
.setGroupId(groupIds.get(0))
.setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
.setAssignorName("share-range"),
@@ -16675,7 +16683,7 @@ public class GroupMetadataManagerTest {
Set.of(0)
),
groupId,
- 1,
+ 2,
true
);
@@ -16691,7 +16699,7 @@ public class GroupMetadataManagerTest {
assertEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new
ShareGroupHeartbeatResponseData.Assignment()),
result.response().getKey()
@@ -16755,7 +16763,7 @@ public class GroupMetadataManagerTest {
Set.of(0)
),
groupId,
- 1,
+ 2,
true
);
@@ -16829,14 +16837,14 @@ public class GroupMetadataManagerTest {
Set.of(0, 1, 2)
),
groupId,
- 1,
+ 2,
true
);
assertResponseEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
@@ -16853,7 +16861,7 @@ public class GroupMetadataManagerTest {
ShareGroupMember expectedMember = new
ShareGroupMember.Builder(memberId)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar"))
.setAssignedPartitions(mkAssignment(
@@ -16864,7 +16872,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
expectedMember),
- GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1,
computeGroupHash(Map.of(
+ GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName,
coordinatorMetadataImage),
barTopicName, computeTopicHash(barTopicName,
coordinatorMetadataImage)
))),
@@ -16872,7 +16880,7 @@ public class GroupMetadataManagerTest {
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
-
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId,
mkShareGroupStateMap(List.of(
mkShareGroupStateMetadataEntry(fooTopicId, fooTopicName,
List.of(0, 1, 2, 3, 4, 5)),
@@ -20628,7 +20636,7 @@ public class GroupMetadataManagerTest {
.setRebalanceTimeoutMs(90000)
.setSubscribedTopicNames(List.of("foo"))
.setTopicPartitions(List.of()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify heartbeat interval
assertEquals(5000, result.response().heartbeatIntervalMs());
@@ -20654,7 +20662,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch()));
- assertEquals(1, result.response().memberEpoch());
+ assertEquals(2, result.response().memberEpoch());
// Verify heartbeat interval
assertEquals(10000, result.response().heartbeatIntervalMs());
@@ -20719,7 +20727,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo")));
- assertEquals(1, result.response().getKey().memberEpoch());
+ assertEquals(2, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
@@ -20728,7 +20736,7 @@ public class GroupMetadataManagerTest {
Set.of(0, 1, 2, 3, 4, 5)
),
groupId,
- 1,
+ 2,
true
);
@@ -20769,7 +20777,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().getKey().memberEpoch()));
- assertEquals(1, result.response().getKey().memberEpoch());
+ assertEquals(2, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
@@ -22858,11 +22866,13 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
// The member subscription is created.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
- // The group epoch is bumped.
- GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1, 0),
- // The target assignment is created.
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, Map.of()),
-
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
1, context.time.milliseconds()),
+
+ // The group is initialized at group epoch 1. Since the group
epoch is not bumped until
+ // regex resolution has completed, no consumer group metadata
record is created.
+ // Similarly, the target assignment is initialized at epoch 1 with
an empty assignment
+ // and not updated until regex resolution has completed, so no
target assignment records
+ // are created.
+
// The member current state is created.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
);
@@ -25302,7 +25312,7 @@ public class GroupMetadataManagerTest {
Set.of(0, 1)
),
groupId,
- 1,
+ 2,
true
);
@@ -25350,7 +25360,7 @@ public class GroupMetadataManagerTest {
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setSubscribedTopicNames(null));
expected = newShareGroupStatePartitionMetadataRecord(
@@ -25377,7 +25387,7 @@ public class GroupMetadataManagerTest {
Set.of(2, 3)
),
groupId,
- 2,
+ 3,
true
);
@@ -25411,7 +25421,7 @@ public class GroupMetadataManagerTest {
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
- .setEpoch(0)
+ .setEpoch(1)
);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
@@ -25448,7 +25458,7 @@ public class GroupMetadataManagerTest {
result.response().getValue(),
Map.of(t1Uuid, Set.of(0, 1)),
groupId,
- 1,
+ 2,
false
);
@@ -25490,7 +25500,7 @@ public class GroupMetadataManagerTest {
result.response().getValue(),
Map.of(t1Uuid, Set.of(0, 1)),
groupId,
- 2,
+ 3,
true
);
verify(context.metrics,
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index 40d3300b093..196de8bb70e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.modern.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
import org.apache.kafka.coordinator.group.modern.Assignment;
import java.util.ArrayList;
@@ -40,8 +41,8 @@ public class ConsumerGroupBuilder {
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
- this.assignmentEpoch = 0;
- this.assignmentTimestamp = 0L;
+ this.assignmentEpoch =
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+ this.assignmentTimestamp =
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
}
public ConsumerGroupBuilder withMember(ConsumerGroupMember member) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 825211d72fa..4570315a00d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -454,36 +454,36 @@ public class ConsumerGroupTest {
ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.build();
consumerGroup.updateMember(member1);
- consumerGroup.setGroupEpoch(1);
+ consumerGroup.setGroupEpoch(2);
assertEquals(MemberState.STABLE, member1.state());
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.build();
consumerGroup.updateMember(member2);
- consumerGroup.setGroupEpoch(2);
+ consumerGroup.setGroupEpoch(3);
assertEquals(MemberState.STABLE, member2.state());
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
- consumerGroup.setTargetAssignmentMetadata(2, 12345L);
+ consumerGroup.setTargetAssignmentMetadata(3, 12345L);
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
consumerGroup.state());
member1 = new ConsumerGroupMember.Builder(member1)
.setState(MemberState.STABLE)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
consumerGroup.updateMember(member1);
@@ -494,8 +494,8 @@ public class ConsumerGroupTest {
// Member 2 is not stable so the group stays in reconciling state.
member2 = new ConsumerGroupMember.Builder(member2)
.setState(MemberState.UNREVOKED_PARTITIONS)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
consumerGroup.updateMember(member2);
@@ -505,8 +505,8 @@ public class ConsumerGroupTest {
member2 = new ConsumerGroupMember.Builder(member2)
.setState(MemberState.STABLE)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
consumerGroup.updateMember(member2);
@@ -807,8 +807,8 @@ public class ConsumerGroupTest {
MockTime time = new MockTime();
ConsumerGroup group = createConsumerGroup("group-foo");
- // Group epoch starts at 0.
- assertEquals(0, group.groupEpoch());
+ // Group epoch starts at 1.
+ assertEquals(1, group.groupEpoch());
// The refresh time deadline should be empty when the group is created
or loaded.
assertTrue(group.hasMetadataExpired(time.milliseconds()));
@@ -941,6 +941,7 @@ public class ConsumerGroupTest {
snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo"))
.build());
snapshotRegistry.idempotentCreateSnapshot(1);
@@ -988,7 +989,7 @@ public class ConsumerGroupTest {
assertDoesNotThrow(consumerGroup::validateDeleteGroup);
ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.build();
consumerGroup.updateMember(member1);
@@ -996,12 +997,12 @@ public class ConsumerGroupTest {
assertEquals(ConsumerGroup.ConsumerGroupState.RECONCILING,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
- consumerGroup.setGroupEpoch(1);
+ consumerGroup.setGroupEpoch(2);
assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
- consumerGroup.setTargetAssignmentMetadata(1, 12345L);
+ consumerGroup.setTargetAssignmentMetadata(2, 12345L);
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE,
consumerGroup.state());
assertThrows(GroupNotEmptyException.class,
consumerGroup::validateDeleteGroup);
@@ -1055,26 +1056,31 @@ public class ConsumerGroupTest {
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo"))
.setServerAssignorName("assignorName")
.build());
group.updateMember(new ConsumerGroupMember.Builder("member2")
+ .setMemberEpoch(1)
.build());
snapshotRegistry.idempotentCreateSnapshot(1);
ConsumerGroupDescribeResponseData.DescribedGroup expected = new
ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1")
.setGroupState(ConsumerGroup.ConsumerGroupState.STABLE.toString())
- .setGroupEpoch(0)
- .setAssignmentEpoch(0)
+ .setGroupEpoch(1)
+ .setAssignmentEpoch(1)
.setAssignorName("assignorName")
.setMembers(Arrays.asList(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId("member1")
+ .setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo"))
.setSubscribedTopicRegex("")
.setMemberType((byte) 1),
- new
ConsumerGroupDescribeResponseData.Member().setMemberId("member2")
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("member2")
+ .setMemberEpoch(1)
.setSubscribedTopicRegex("")
.setMemberType((byte) 1)
));
@@ -1093,6 +1099,7 @@ public class ConsumerGroupTest {
assertFalse(group.isInStates(Set.of("Empty"), 0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
+ .setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo"))
.build());
snapshotRegistry.idempotentCreateSnapshot(1);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index 14d6442057c..e853b7d7ef0 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
import org.apache.kafka.coordinator.group.modern.Assignment;
import java.util.ArrayList;
@@ -39,8 +40,8 @@ public class ShareGroupBuilder {
public ShareGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
- this.assignmentEpoch = 0;
- this.assignmentTimestamp = 0L;
+ this.assignmentEpoch =
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+ this.assignmentTimestamp =
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
}
public ShareGroupBuilder withMember(ShareGroupMember member) {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 60daaa5d1bc..1d3505cf1d3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -297,8 +297,8 @@ public class ShareGroupTest {
MockTime time = new MockTime();
ShareGroup shareGroup = createShareGroup("group-foo");
- // Group epoch starts at 0.
- assertEquals(0, shareGroup.groupEpoch());
+ // Group epoch starts at 1.
+ assertEquals(1, shareGroup.groupEpoch());
// The refresh time deadline should be empty when the group is created
or loaded.
assertTrue(shareGroup.hasMetadataExpired(time.milliseconds()));
@@ -453,8 +453,8 @@ public class ShareGroupTest {
ShareGroupDescribeResponseData.DescribedGroup expected = new
ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1")
.setGroupState(ShareGroupState.STABLE.toString())
- .setGroupEpoch(0)
- .setAssignmentEpoch(0)
+ .setGroupEpoch(1)
+ .setAssignmentEpoch(1)
.setAssignorName("assignorName")
.setMembers(Arrays.asList(
new ShareGroupDescribeResponseData.Member()
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
index 5d3688693be..9d16e2501a3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupBuilder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.streams;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.TargetAssignmentMetadata;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
@@ -42,8 +43,8 @@ public class StreamsGroupBuilder {
public StreamsGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
this.groupEpoch = groupEpoch;
- this.targetAssignmentEpoch = 0;
- this.targetAssignmentTimestamp = 0L;
+ this.targetAssignmentEpoch =
TargetAssignmentMetadata.INITIAL.assignmentEpoch();
+ this.targetAssignmentTimestamp =
TargetAssignmentMetadata.INITIAL.assignmentTimestamp();
this.topology = null;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index d52c5e631f1..c5fc85ea7d3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -489,12 +489,12 @@ public class StreamsGroupTest {
StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.build();
streamsGroup.updateMember(member1);
- streamsGroup.setGroupEpoch(1);
+ streamsGroup.setGroupEpoch(2);
assertEquals(MemberState.STABLE, member1.state());
assertEquals(StreamsGroup.StreamsGroupState.NOT_READY,
streamsGroup.state());
@@ -508,24 +508,24 @@ public class StreamsGroupTest {
StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
.setState(MemberState.STABLE)
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.build();
streamsGroup.updateMember(member2);
- streamsGroup.setGroupEpoch(2);
+ streamsGroup.setGroupEpoch(3);
assertEquals(MemberState.STABLE, member2.state());
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
- streamsGroup.setTargetAssignmentMetadata(2, 12345L);
+ streamsGroup.setTargetAssignmentMetadata(3, 12345L);
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
member1 = new StreamsGroupMember.Builder(member1)
.setState(MemberState.STABLE)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
streamsGroup.updateMember(member1);
@@ -536,8 +536,8 @@ public class StreamsGroupTest {
// Member 2 is not stable so the group stays in reconciling state.
member2 = new StreamsGroupMember.Builder(member2)
.setState(MemberState.UNREVOKED_TASKS)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
streamsGroup.updateMember(member2);
@@ -547,8 +547,8 @@ public class StreamsGroupTest {
member2 = new StreamsGroupMember.Builder(member2)
.setState(MemberState.STABLE)
- .setMemberEpoch(2)
- .setPreviousMemberEpoch(1)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
.build();
streamsGroup.updateMember(member2);
@@ -567,8 +567,8 @@ public class StreamsGroupTest {
MockTime time = new MockTime();
StreamsGroup group = createStreamsGroup("group-foo");
- // Group epoch starts at 0.
- assertEquals(0, group.groupEpoch());
+ // Group epoch starts at 1.
+ assertEquals(1, group.groupEpoch());
// The refresh time deadline should be empty when the group is created
or loaded.
assertTrue(group.hasMetadataExpired(time.milliseconds()));
@@ -856,7 +856,7 @@ public class StreamsGroupTest {
assertDoesNotThrow(streamsGroup::validateDeleteGroup);
StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
- .setMemberEpoch(1)
+ .setMemberEpoch(2)
.setPreviousMemberEpoch(0)
.setState(MemberState.STABLE)
.build();
@@ -872,12 +872,12 @@ public class StreamsGroupTest {
assertEquals(StreamsGroup.StreamsGroupState.RECONCILING,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
- streamsGroup.setGroupEpoch(1);
+ streamsGroup.setGroupEpoch(2);
assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);
- streamsGroup.setTargetAssignmentMetadata(1, 12345L);
+ streamsGroup.setTargetAssignmentMetadata(2, 12345L);
assertEquals(StreamsGroup.StreamsGroupState.STABLE,
streamsGroup.state());
assertThrows(GroupNotEmptyException.class,
streamsGroup::validateDeleteGroup);