This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d2f162a0714 MINOR: kafka-stream-groups.sh should fail quickly if the 
partition leader is unavailable (#20271)
d2f162a0714 is described below

commit d2f162a0714c10ff284c36b781c451a7d542ce7a
Author: jimmy <wangzhiwang...@gmail.com>
AuthorDate: Tue Aug 26 16:08:35 2025 +0800

    MINOR: kafka-stream-groups.sh should fail quickly if the partition leader 
is unavailable (#20271)
    
    This PR applies the same partition leader check for `StreamsGroupCommand` as
    `ShareGroupCommand`  and `ConsumerGroupCommand` to avoid the command
    execution timeout.
    
    Reviewers: Lucas Brutschy <lucas...@apache.org>
---
 .../kafka/tools/streams/StreamsGroupCommand.java   |  1 +
 .../consumer/group/ShareGroupCommandTest.java      |  2 +-
 .../tools/streams/StreamsGroupCommandTest.java     | 55 ++++++++++++++++++++--
 3 files changed, 54 insertions(+), 4 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 0f68bf82900..0c54f6c53f9 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -881,6 +881,7 @@ public class StreamsGroupCommand {
                 List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
 
                 List<TopicPartition> partitions = 
offsetsUtils.parseTopicPartitionsToReset(topics);
+                offsetsUtils.checkAllTopicPartitionsValid(partitions);
                 // if the user specified topics that do not belong to this 
group, we filter them out
                 partitions = filterExistingGroupTopics(groupId, partitions);
                 return partitions;
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index b14d66c652a..9333bbbb65e 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -1373,7 +1373,7 @@ public class ShareGroupCommandTest {
     }
     
     @Test
-    public void testAlterShareGroupFailureFailureWithNonExistentTopic() {
+    public void testAlterShareGroupFailureWithNonExistentTopic() {
         String group = "share-group";
         String topic = "none";
         String bootstrapServer = "localhost:9092";
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 6f38c47f15a..4f1e116437e 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.test.TestUtils;
 
@@ -65,6 +66,7 @@ import java.util.stream.IntStream;
 
 import joptsimple.OptionException;
 
+import static org.apache.kafka.common.KafkaFuture.completedFuture;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -72,6 +74,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -293,21 +296,30 @@ public class StreamsGroupCommandTest {
     @Test
     public void testAdminRequestsForResetOffsets() {
         Admin adminClient = mock(KafkaAdminClient.class);
+        String topic = "topic1";
         String groupId = "foo-group";
         List<String> args = List.of("--bootstrap-server", "localhost:9092", 
"--group", groupId, "--reset-offsets", "--input-topic", "topic1", 
"--to-latest");
-        List<String> topics = List.of("topic1");
+        List<String> topics = List.of(topic);
 
+        DescribeTopicsResult describeTopicsResult = 
mock(DescribeTopicsResult.class);
         when(adminClient.describeStreamsGroups(List.of(groupId)))
             .thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
+        ));
+        when(adminClient.describeTopics(anyCollection()))
+            .thenReturn(describeTopicsResult);
         when(adminClient.describeTopics(eq(topics), 
any(DescribeTopicsOptions.class)))
-            .thenReturn(describeTopicsResult(topics, 1));
+            .thenReturn(describeTopicsResult);
+        
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
         when(adminClient.listOffsets(any(), any()))
             .thenReturn(listOffsetsResult());
         ListGroupsResult listGroupsResult = listGroupResult(groupId);
         
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
         ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
-        committedOffsetsMap.put(new TopicPartition("topic1", 0), 
mock(OffsetAndMetadata.class));
+        committedOffsetsMap.put(new TopicPartition(topic, 0), 
mock(OffsetAndMetadata.class));
         
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
         
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
 
@@ -427,6 +439,43 @@ public class StreamsGroupCommandTest {
 
         service.close();
     }
+    
+    @Test
+    public void testResetOffsetsWithPartitionNotExist() {
+        Admin adminClient = mock(KafkaAdminClient.class);
+        String groupId = "foo-group";
+        String topic = "topic";
+        List<String> args = new 
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", 
groupId, "--reset-offsets", "--input-topic", "topic:3", "--to-latest"));
+        
+        when(adminClient.describeStreamsGroups(List.of(groupId)))
+            .thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
+        DescribeTopicsResult describeTopicsResult = 
mock(DescribeTopicsResult.class);
+
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
+        ));
+        when(adminClient.describeTopics(anyCollection()))
+            .thenReturn(describeTopicsResult);
+        when(adminClient.describeTopics(eq(List.of(topic)), 
any(DescribeTopicsOptions.class)))
+            .thenReturn(describeTopicsResult);
+        
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        when(adminClient.listOffsets(any(), any()))
+            .thenReturn(listOffsetsResult());
+        ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = Map.of(
+            new TopicPartition(topic, 0), 
+            new OffsetAndMetadata(12, Optional.of(0), ""),
+            new TopicPartition(topic, 1),
+            new OffsetAndMetadata(12, Optional.of(0), "")  
+        );
+        
+        
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
+        
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+        StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
+        assertThrows(UnknownTopicOrPartitionException.class, () -> 
service.resetOffsets());
+        service.close();
+    }
 
     private ListGroupsResult listGroupResult(String groupId) {
         ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);

Reply via email to