This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6d16f687aa1 KAFKA-19662: Reset share group offsets for unsubscribed
topics (#20708)
6d16f687aa1 is described below
commit 6d16f687aa1a0df26f2f665436b7efaf0aec0c56
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Oct 23 18:37:25 2025 +0100
KAFKA-19662: Reset share group offsets for unsubscribed topics (#20708)
This PR allows the kafka-share-groups.sh --reset-offsets tool to be used
to set offsets for topics which are not currently subscribed in a share
group. It also works if the share group does not yet exist. This brings
the capability in line with the equivalent function in
Kafka-consumer-groups.sh. The primary purpose is to allow offsets to be
set before the share group is first used as a way of initialising in a
known state.
Reviewers: Jimmy Wang <[email protected]>,
Kuan-Po Tseng <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 12 ++---
.../coordinator/group/GroupCoordinatorShard.java | 19 +------
.../coordinator/group/GroupMetadataManager.java | 59 ++++++++++++++++++----
.../coordinator/group/modern/share/ShareGroup.java | 4 --
.../tools/consumer/group/ShareGroupCommand.java | 34 ++++++++++---
.../consumer/group/ShareGroupCommandTest.java | 46 ++++++++++++++++-
6 files changed, 128 insertions(+), 46 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index d48ddaf58ad..a480a3b5ffb 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2934,7 +2934,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val testTopicName = "test_topic"
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
- val fakeGroupId = "fake_group_id"
+ val nonexistentGroupId = "nonexistent_group_id"
val fakeTopicName = "foo"
val tp1 = new TopicPartition(testTopicName, 0)
@@ -2968,12 +2968,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureThrows(classOf[GroupNotEmptyException],
offsetAlterResult.partitionResult(tp1))
assertFutureThrows(classOf[GroupNotEmptyException],
offsetAlterResult.partitionResult(tp2))
- // Test the fake group ID
- val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId,
util.Map.of(tp1, 0, tp2, 0))
+ // Test the non-existent group ID
+ val nonexistentAlterResult =
client.alterShareGroupOffsets(nonexistentGroupId, util.Map.of(tp1, 0, tp2, 0))
- assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.all())
- assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.partitionResult(tp1))
- assertFutureThrows(classOf[GroupIdNotFoundException],
fakeAlterResult.partitionResult(tp2))
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
nonexistentAlterResult.all())
+ assertNull(nonexistentAlterResult.partitionResult(tp1).get())
+ assertFutureThrows(classOf[UnknownTopicOrPartitionException],
nonexistentAlterResult.partitionResult(tp2))
}
// Test offset alter when group is empty
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c591f8d3767..3165d10d8f4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -779,10 +779,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
/**
- * Make the following checks to make sure the
AlterShareGroupOffsetsRequest request is valid:
- * 1. Checks whether the provided group is empty
- * 2. Checks the requested topics are presented in the metadataImage
- * 3. Checks the corresponding share partitions in
AlterShareGroupOffsetsRequest are existing
+ * Alters the offsets for a share group.
*
* @param groupId - The group ID
* @param alterShareGroupOffsetsRequestData - The request data for
AlterShareGroupOffsetsRequestData
@@ -793,19 +790,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
String groupId,
AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
) {
- List<CoordinatorRecord> records = new ArrayList<>();
- ShareGroup group = groupMetadataManager.shareGroup(groupId);
- group.validateOffsetsAlterable();
-
- Map.Entry<AlterShareGroupOffsetsResponseData,
InitializeShareGroupStateParameters> response =
groupMetadataManager.completeAlterShareGroupOffsets(
- groupId,
- alterShareGroupOffsetsRequestData,
- records
- );
- return new CoordinatorResult<>(
- records,
- response
- );
+ return groupMetadataManager.alterShareGroupOffsets(groupId,
alterShareGroupOffsetsRequestData.topics());
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 66f755148e9..09b459a27f7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -1513,6 +1514,21 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Checks whether the share group is empty.
+ *
+ * @param group The share group.
+ *
+ * @throws GroupNotEmptyException if the group is not empty.
+ */
+ private void throwIfShareGroupIsNotEmpty(
+ ShareGroup group
+ ) throws GroupNotEmptyException {
+ if (group.numMembers() > 0) {
+ throw new GroupNotEmptyException(Errors.NON_EMPTY_GROUP.message());
+ }
+ }
+
/**
* Validates the member epoch provided in the heartbeat request.
*
@@ -8300,19 +8316,37 @@ public class GroupMetadataManager {
return deleteShareGroupStateRequestTopicsData;
}
- public Map.Entry<AlterShareGroupOffsetsResponseData,
InitializeShareGroupStateParameters> completeAlterShareGroupOffsets(
+ /**
+ * Handles an AlterShareGroupOffsets request.
+ *
+ * Make the following checks to make sure the
AlterShareGroupOffsetsRequest request is valid:
+ * 1. Checks whether the provided group is empty
+ * 2. Checks the requested topics are presented in the metadataImage
+ * 3. Checks the corresponding share partitions in
AlterShareGroupOffsetsRequest are existing
+ *
+ * @param groupId The group id from the request.
+ * @param topics The topic information for altering the share group's
offsets from the request.
+ *
+ * @return A Result containing a pair of ShareGroupHeartbeat response and
maybe InitializeShareGroupStateParameters
+ * and a list of records to update the state machine.
+ */
+ public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData,
InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets(
String groupId,
- AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest,
- List<CoordinatorRecord> records
- ) {
+
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection
topics
+ ) throws ApiException {
final long currentTimeMs = time.milliseconds();
- Group group = groups.get(groupId);
+ final List<CoordinatorRecord> records = new ArrayList<>();
+
+ // Get or create the share group. If the group exists, check that it's
empty. If it is created, it is empty.
+ final ShareGroup group = getOrMaybeCreateShareGroup(groupId, true);
+ throwIfShareGroupIsNotEmpty(group);
+
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection
alterShareGroupOffsetsResponseTopics = new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection();
Map<Uuid, InitMapValue> initializingTopics = new HashMap<>();
Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new
HashMap<>();
- alterShareGroupOffsetsRequest.topics().forEach(topic -> {
+ topics.forEach(topic -> {
Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOpt
= metadataImage.topicMetadata(topic.topicName());
if (topicMetadataOpt.isPresent()) {
var topicMetadata = topicMetadataOpt.get();
@@ -8366,10 +8400,13 @@ public class GroupMetadataManager {
});
addInitializingTopicsRecords(groupId, records, initializingTopics);
- return Map.entry(
- new AlterShareGroupOffsetsResponseData()
- .setResponses(alterShareGroupOffsetsResponseTopics),
- buildInitializeShareGroupState(groupId, ((ShareGroup)
group).groupEpoch(), offsetByTopicPartitions)
+ return new CoordinatorResult<>(
+ records,
+ Map.entry(
+ new AlterShareGroupOffsetsResponseData()
+ .setResponses(alterShareGroupOffsetsResponseTopics),
+ buildInitializeShareGroupState(groupId, group.groupEpoch(),
offsetByTopicPartitions)
+ )
);
}
@@ -8432,7 +8469,7 @@ public class GroupMetadataManager {
return new CoordinatorResult<>(records);
}
- /*
+ /**
* Returns a list of {@link
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic}
corresponding to the
* topics for which persister delete share group state request was
successful
* @param groupId group ID of the share group
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index efe2474abef..57c9cf5cb65 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -245,10 +245,6 @@ public class ShareGroup extends
ModernGroup<ShareGroupMember> {
validateEmptyGroup();
}
- public void validateOffsetsAlterable() throws ApiException {
- validateEmptyGroup();
- }
-
public void validateEmptyGroup() {
if (state() != ShareGroupState.EMPTY) {
throw Errors.NON_EMPTY_GROUP.exception();
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 50565dcc0ba..87cf0f1e837 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
@@ -385,10 +386,25 @@ public class ShareGroupCommand {
if
(!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) ||
GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
CommandLineUtils.printErrorAndExit(String.format("Share
group '%s' is not empty.", groupId));
}
- Map<TopicPartition, OffsetAndMetadata> offsetsToReset =
prepareOffsetsToReset(groupId);
- if (offsetsToReset == null) {
- return;
+ resetOffsetsForInactiveGroup(groupId);
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof GroupIdNotFoundException) {
+ resetOffsetsForInactiveGroup(groupId);
+ } else if (cause instanceof KafkaException) {
+ CommandLineUtils.printErrorAndExit(cause.getMessage());
+ } else {
+ throw new RuntimeException(cause);
}
+ }
+ }
+
+ private void resetOffsetsForInactiveGroup(String groupId) {
+ try {
+ Collection<TopicPartition> partitionsToReset =
getPartitionsToReset(groupId);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToReset =
prepareOffsetsToReset(partitionsToReset);
boolean dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt);
if (!dryRun) {
adminClient.alterShareGroupOffsets(groupId,
@@ -404,24 +420,28 @@ public class ShareGroupCommand {
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof KafkaException) {
- CommandLineUtils.printErrorAndExit(cause.getMessage());
+ throw (KafkaException) cause;
} else {
throw new RuntimeException(cause);
}
}
}
- protected Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId) throws ExecutionException,
InterruptedException {
- Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
- Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ private Collection<TopicPartition> getPartitionsToReset(String
groupId) throws ExecutionException, InterruptedException {
Collection<TopicPartition> partitionsToReset;
if (opts.options.has(opts.topicOpt)) {
partitionsToReset =
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
} else {
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
+ Map<TopicPartition, OffsetAndMetadata>
offsetsByTopicPartitions =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
partitionsToReset = offsetsByTopicPartitions.keySet();
}
+ return partitionsToReset;
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(Collection<TopicPartition> partitionsToReset) {
offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToEarliestOpt)) {
return offsetsUtils.resetToEarliest(partitionsToReset);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 3b1843b95cd..29fe151ba2b 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -1313,7 +1313,7 @@ public class ShareGroupCommandTest {
}
@Test
- public void testAlterShareGroupOffsetsFailureWithNoneEmptyGroup() {
+ public void testAlterShareGroupOffsetsFailureWithNonEmptyGroup() {
String group = "share-group";
String topic = "topic";
String bootstrapServer = "localhost:9092";
@@ -1418,6 +1418,50 @@ public class ShareGroupCommandTest {
}
}
+ @Test
+ public void testAlterShareGroupNonExistentGroupSuccess() {
+ String group = "share-group";
+ String topic = "none";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group",
group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ )
+ );
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0,
new OffsetAndMetadata(0L));
+ ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+ when(adminClient.listOffsets(any(),
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+ KafkaFutureImpl<ShareGroupDescription> missingGroupFuture = new
KafkaFutureImpl<>();
+ missingGroupFuture.completeExceptionally(new
GroupIdNotFoundException("Group " + group + " not found."));
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
missingGroupFuture));
+ when(adminClient.describeShareGroups(any(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ Map<String, TopicDescription> descriptions = Map.of(
+ topic, new TopicDescription(topic, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+ )));
+ DescribeTopicsResult describeTopicResult =
mock(DescribeTopicsResult.class);
+
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.resetOffsets();
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
+ verify(alterShareGroupOffsetsResult, times(1)).all();
+
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
+ }
+ }
+
private AlterShareGroupOffsetsResult mockAlterShareGroupOffsets(Admin
client, String groupId) {
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mock(AlterShareGroupOffsetsResult.class);
KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();