This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d2f162a0714 MINOR: kafka-stream-groups.sh should fail quickly if the partition leader is unavailable (#20271) d2f162a0714 is described below commit d2f162a0714c10ff284c36b781c451a7d542ce7a Author: jimmy <wangzhiwang...@gmail.com> AuthorDate: Tue Aug 26 16:08:35 2025 +0800 MINOR: kafka-stream-groups.sh should fail quickly if the partition leader is unavailable (#20271) This PR applies the same partition leader check for `StreamsGroupCommand` as `ShareGroupCommand` and `ConsumerGroupCommand` to avoid the command execution timeout. Reviewers: Lucas Brutschy <lucas...@apache.org> --- .../kafka/tools/streams/StreamsGroupCommand.java | 1 + .../consumer/group/ShareGroupCommandTest.java | 2 +- .../tools/streams/StreamsGroupCommandTest.java | 55 ++++++++++++++++++++-- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 0f68bf82900..0c54f6c53f9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -881,6 +881,7 @@ public class StreamsGroupCommand { List<String> topics = opts.options.valuesOf(opts.inputTopicOpt); List<TopicPartition> partitions = offsetsUtils.parseTopicPartitionsToReset(topics); + offsetsUtils.checkAllTopicPartitionsValid(partitions); // if the user specified topics that do not belong to this group, we filter them out partitions = filterExistingGroupTopics(groupId, partitions); return partitions; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index b14d66c652a..9333bbbb65e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -1373,7 +1373,7 @@ public class ShareGroupCommandTest { } @Test - public void testAlterShareGroupFailureFailureWithNonExistentTopic() { + public void testAlterShareGroupFailureWithNonExistentTopic() { String group = "share-group"; String topic = "none"; String bootstrapServer = "localhost:9092"; diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java index 6f38c47f15a..4f1e116437e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.test.TestUtils; @@ -65,6 +66,7 @@ import java.util.stream.IntStream; import joptsimple.OptionException; +import static org.apache.kafka.common.KafkaFuture.completedFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -72,6 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -293,21 +296,30 @@ public class StreamsGroupCommandTest { @Test public void testAdminRequestsForResetOffsets() { Admin adminClient = mock(KafkaAdminClient.class); + String topic = "topic1"; String groupId = "foo-group"; List<String> args = List.of("--bootstrap-server", "localhost:9092", "--group", groupId, "--reset-offsets", "--input-topic", "topic1", "--to-latest"); - List<String> topics = List.of("topic1"); + List<String> topics = List.of(topic); + DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class); when(adminClient.describeStreamsGroups(List.of(groupId))) .thenReturn(describeStreamsResult(groupId, GroupState.DEAD)); + Map<String, TopicDescription> descriptions = Map.of( + topic, new TopicDescription(topic, false, List.of( + new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())) + )); + when(adminClient.describeTopics(anyCollection())) + .thenReturn(describeTopicsResult); when(adminClient.describeTopics(eq(topics), any(DescribeTopicsOptions.class))) - .thenReturn(describeTopicsResult(topics, 1)); + .thenReturn(describeTopicsResult); + when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions)); when(adminClient.listOffsets(any(), any())) .thenReturn(listOffsetsResult()); ListGroupsResult listGroupsResult = listGroupResult(groupId); when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult); ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class); Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new HashMap<>(); - committedOffsetsMap.put(new TopicPartition("topic1", 0), mock(OffsetAndMetadata.class)); + committedOffsetsMap.put(new TopicPartition(topic, 0), mock(OffsetAndMetadata.class)); when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result); when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap)); @@ -427,6 +439,43 @@ public class StreamsGroupCommandTest { service.close(); } + + @Test + public void testResetOffsetsWithPartitionNotExist() { + Admin adminClient = mock(KafkaAdminClient.class); + String groupId = "foo-group"; + String topic = "topic"; + List<String> args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--reset-offsets", "--input-topic", "topic:3", "--to-latest")); + + when(adminClient.describeStreamsGroups(List.of(groupId))) + .thenReturn(describeStreamsResult(groupId, GroupState.DEAD)); + DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class); + + Map<String, TopicDescription> descriptions = Map.of( + topic, new TopicDescription(topic, false, List.of( + new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())) + )); + when(adminClient.describeTopics(anyCollection())) + .thenReturn(describeTopicsResult); + when(adminClient.describeTopics(eq(List.of(topic)), any(DescribeTopicsOptions.class))) + .thenReturn(describeTopicsResult); + when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions)); + when(adminClient.listOffsets(any(), any())) + .thenReturn(listOffsetsResult()); + ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class); + Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = Map.of( + new TopicPartition(topic, 0), + new OffsetAndMetadata(12, Optional.of(0), ""), + new TopicPartition(topic, 1), + new OffsetAndMetadata(12, Optional.of(0), "") + ); + + when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result); + when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap)); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient); + assertThrows(UnknownTopicOrPartitionException.class, () -> service.resetOffsets()); + service.close(); + } private ListGroupsResult listGroupResult(String groupId) { ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);