This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 723847904b3 KAFKA-20444: [2/N] Update OffsetMetadataManager to use new
TxnOffsetCommit errors (KIP-1319) (#22214)
723847904b3 is described below
commit 723847904b3844e88435b42f315a5fa9aa2534d7
Author: David Jacot <[email protected]>
AuthorDate: Wed May 6 09:09:40 2026 +0200
KAFKA-20444: [2/N] Update OffsetMetadataManager to use new TxnOffsetCommit
errors (KIP-1319) (#22214)
This patch updates `OffsetMetadataManager` to propagate
`GROUP_ID_NOT_FOUND` and `STALE_MEMBER_EPOCH` directly starting from
version 6 of the `TxnOffsetCommit` API, both at the group lookup and at
the per-partition validator. The legacy mapping to `ILLEGAL_GENERATION`
is preserved for versions 0 to 5.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../common/requests/TxnOffsetCommitRequest.java | 18 ++
.../coordinator/group/OffsetMetadataManager.java | 15 ++
.../group/OffsetMetadataManagerTest.java | 205 ++++++++++++---------
3 files changed, 151 insertions(+), 87 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 30c40c29dab..626116c70a1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -40,6 +40,24 @@ import java.util.stream.Collectors;
public class TxnOffsetCommitRequest extends AbstractRequest {
public static final short LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2 = 4;
+ /**
+ * @return true if the given version returns {@code GROUP_ID_NOT_FOUND}
directly when the
+ * group is not found; false if the legacy mapping to {@code
ILLEGAL_GENERATION}
+ * is used (KIP-1319).
+ */
+ public static boolean supportsGroupIdNotFoundError(short version) {
+ return version >= 6;
+ }
+
+ /**
+ * @return true if the given version returns {@code STALE_MEMBER_EPOCH}
directly when the
+ * member epoch is stale; false if the legacy mapping to {@code
ILLEGAL_GENERATION}
+ * is used (KIP-1319).
+ */
+ public static boolean supportsStaleMemberEpochError(short version) {
+ return version >= 6;
+ }
+
private final TxnOffsetCommitRequestData data;
public static class Builder extends
AbstractRequest.Builder<TxnOffsetCommitRequest> {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 73fbc151243..ede54a05fab 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.internals.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
@@ -528,7 +529,11 @@ public class OffsetMetadataManager {
// facility. In this case, a so-called simple group is created
and the request
// is accepted.
group =
groupMetadataManager.getOrMaybeCreateClassicGroup(request.groupId(), true);
+ } else if
(TxnOffsetCommitRequest.supportsGroupIdNotFoundError((short)
context.requestVersion())) {
+ // From v6 onwards, GROUP_ID_NOT_FOUND is propagated directly
(KIP-1319).
+ throw ex;
} else {
+ // For older versions, preserve the legacy mapping to
ILLEGAL_GENERATION.
throw Errors.ILLEGAL_GENERATION.exception();
}
}
@@ -542,6 +547,11 @@ public class OffsetMetadataManager {
context.requestVersion()
);
} catch (StaleMemberEpochException ex) {
+ if (TxnOffsetCommitRequest.supportsStaleMemberEpochError((short)
context.requestVersion())) {
+ // From v6 onwards, STALE_MEMBER_EPOCH is propagated directly
(KIP-1319).
+ throw ex;
+ }
+ // For older versions, preserve the legacy mapping to
ILLEGAL_GENERATION.
throw Errors.ILLEGAL_GENERATION.exception();
}
}
@@ -725,6 +735,11 @@ public class OffsetMetadataManager {
partition.partitionIndex()
);
} catch (StaleMemberEpochException ex) {
+ if
(TxnOffsetCommitRequest.supportsStaleMemberEpochError((short)
context.requestVersion())) {
+ // From v6 onwards, STALE_MEMBER_EPOCH is
propagated directly (KIP-1319).
+ throw ex;
+ }
+ // For older versions, preserve the legacy mapping to
ILLEGAL_GENERATION.
throw Errors.ILLEGAL_GENERATION.exception();
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index adb17e4b17a..b0f3170e7e2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -265,11 +265,18 @@ public class OffsetMetadataManagerTest {
public CoordinatorResult<TxnOffsetCommitResponseData,
CoordinatorRecord> commitTransactionalOffset(
TxnOffsetCommitRequestData request
+ ) {
+ return commitTransactionalOffset(request,
ApiKeys.TXN_OFFSET_COMMIT.latestVersion());
+ }
+
+ public CoordinatorResult<TxnOffsetCommitResponseData,
CoordinatorRecord> commitTransactionalOffset(
+ TxnOffsetCommitRequestData request,
+ short version
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.TXN_OFFSET_COMMIT,
- ApiKeys.TXN_OFFSET_COMMIT.latestVersion(),
+ version,
"client",
0
),
@@ -1654,8 +1661,9 @@ public class OffsetMetadataManagerTest {
verifyTransactionalOffsetCommit(context);
}
- @Test
- public void testConsumerGroupTransactionalOffsetCommitResolvesTopicId() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void
testConsumerGroupTransactionalOffsetCommitResolvesTopicId(short version) {
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
@@ -1696,13 +1704,17 @@ public class OffsetMetadataManagerTest {
// When client epoch (3) < assignment epoch (5), exception should be
thrown.
request.setGenerationIdOrMemberEpoch(3);
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(request));
+ Class<? extends Throwable> expected = version >= 6
+ ? StaleMemberEpochException.class
+ : IllegalGenerationException.class;
+ assertThrows(expected, () ->
context.commitTransactionalOffset(request, version));
// When client epoch (5) >= assignment epoch (5), commit should
succeed.
request.setGenerationIdOrMemberEpoch(5);
- assertDoesNotThrow(() -> context.commitTransactionalOffset(request));
+ assertDoesNotThrow(() -> context.commitTransactionalOffset(request,
version));
- CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord>
result = context.commitTransactionalOffset(request);
+ CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord>
result =
+ context.commitTransactionalOffset(request, version);
assertEquals(
new TxnOffsetCommitResponseData()
.setTopics(List.of(
@@ -1788,27 +1800,31 @@ public class OffsetMetadataManagerTest {
);
}
- @Test
- public void testTransactionalOffsetCommitWithUnknownGroupId() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void testTransactionalOffsetCommitWithUnknownGroupId(short version)
{
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
- new TxnOffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(10)
- .setTopics(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata("metadata")
- ))
- ))
- ));
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(10)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ));
+
+ Class<? extends Throwable> expected = version >= 6
+ ? GroupIdNotFoundException.class
+ : IllegalGenerationException.class;
+ assertThrows(expected, () ->
context.commitTransactionalOffset(request, version));
}
@Test
@@ -1857,8 +1873,9 @@ public class OffsetMetadataManagerTest {
));
}
- @Test
- public void
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void
testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
@@ -1874,7 +1891,7 @@ public class OffsetMetadataManagerTest {
.build()
);
- verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
+ verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
}
@Test
@@ -1914,8 +1931,9 @@ public class OffsetMetadataManagerTest {
));
}
- @Test
- public void
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void
testStreamsGroupTransactionalOffsetCommitWithStaleMemberEpoch(short version) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create an empty group.
@@ -1930,27 +1948,33 @@ public class OffsetMetadataManagerTest {
.build()
);
- verifyTransactionalOffsetCommitWithStaleMemberEpoch(context);
+ verifyTransactionalOffsetCommitWithStaleMemberEpoch(context, version);
}
- private static void
verifyTransactionalOffsetCommitWithStaleMemberEpoch(OffsetMetadataManagerTestContext
context) {
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
- new TxnOffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(100)
- .setTopics(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata("metadata")
- ))
- ))
- ));
+ private static void verifyTransactionalOffsetCommitWithStaleMemberEpoch(
+ OffsetMetadataManagerTestContext context,
+ short version
+ ) {
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(100)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ));
+
+ Class<? extends Throwable> expected = version >= 6
+ ? StaleMemberEpochException.class
+ : IllegalGenerationException.class;
+ assertThrows(expected, () ->
context.commitTransactionalOffset(request, version));
}
@Test
@@ -2023,27 +2047,31 @@ public class OffsetMetadataManagerTest {
);
}
- @Test
- public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void
testGenericGroupTransactionalOffsetCommitWithUnknownGroupId(short version) {
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
- new TxnOffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(10)
- .setTopics(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata("metadata")
- ))
- ))
- ));
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(10)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ));
+
+ Class<? extends Throwable> expected = version >= 6
+ ? GroupIdNotFoundException.class
+ : IllegalGenerationException.class;
+ assertThrows(expected, () ->
context.commitTransactionalOffset(request, version));
}
@Test
@@ -2075,8 +2103,9 @@ public class OffsetMetadataManagerTest {
));
}
- @Test
- public void
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() {
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT)
+ public void
testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId(short version)
{
OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
// Create a group.
@@ -2095,23 +2124,25 @@ public class OffsetMetadataManagerTest {
assertEquals(1, group.generationId());
group.transitionTo(ClassicGroupState.STABLE);
- assertThrows(IllegalGenerationException.class, () ->
context.commitTransactionalOffset(
- new TxnOffsetCommitRequestData()
- .setGroupId("foo")
- .setMemberId("member")
- .setGenerationIdOrMemberEpoch(100)
- .setTopics(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
- .setName("bar")
- .setPartitions(List.of(
- new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
- .setPartitionIndex(0)
- .setCommittedOffset(100L)
- .setCommittedLeaderEpoch(10)
- .setCommittedMetadata("metadata")
- ))
- ))
- ));
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(100)
+ .setTopics(List.of(
+ new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L)
+ .setCommittedLeaderEpoch(10)
+ .setCommittedMetadata("metadata")
+ ))
+ ));
+
+ // Classic groups always throw IllegalGenerationException, regardless
of API version.
+ assertThrows(IllegalGenerationException.class,
+ () -> context.commitTransactionalOffset(request, version));
}
@Test