This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64b63f472f0 KAFKA-19316: added share_group_command_test.py system 
tests (#19774)
64b63f472f0 is described below

commit 64b63f472f0382761025944105b71b2e2e356d88
Author: Chirag Wadhwa <cwad...@confluent.io>
AuthorDate: Thu May 29 15:29:32 2025 +0530

    KAFKA-19316: added share_group_command_test.py system tests (#19774)
    
    This PR include system tests in the file share_group_command_test.py.
    These tests test the functionality of kafka-share-groups.sh tool
    
    Reviewers: Sushant Mahajan <smaha...@confluent.io>, Andrew Schofield
     <aschofi...@confluent.io>
---
 tests/kafkatest/services/console_share_consumer.py |   2 +-
 tests/kafkatest/services/kafka/kafka.py            |  77 ++++++++++++-
 .../templates/console_share_consumer.properties    |   2 +-
 .../tests/core/share_group_command_test.py         | 128 +++++++++++++++++++++
 4 files changed, 206 insertions(+), 3 deletions(-)

diff --git a/tests/kafkatest/services/console_share_consumer.py 
b/tests/kafkatest/services/console_share_consumer.py
index 4136511c574..03fbaeaf5a5 100644
--- a/tests/kafkatest/services/console_share_consumer.py
+++ b/tests/kafkatest/services/console_share_consumer.py
@@ -57,7 +57,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, 
BackgroundThreadSer
             "collect_default": False}
     }
 
-    def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-share-consumer-group",
+    def __init__(self, context, num_nodes, kafka, topic, 
group_id="test-share-group",
                  message_validator=None, share_consumer_timeout_ms=None, 
version=DEV_BRANCH,
                  client_id="console-share-consumer", print_key=False, 
jmx_object_names=None, jmx_attributes=None,
                  enable_systest_events=False, stop_timeout_sec=35, 
print_timestamp=False, print_partition=False,
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index ca19ca8bd11..b490f75fbac 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1749,6 +1749,27 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         if type is not None:
             cmd += " --type %s" % type
         return self.run_cli_tool(node, cmd)
+    
+    def list_share_groups(self, node=None, command_config=None, state=None):
+        """ Get list of share groups.
+        """
+        if node is None:
+            node = self.nodes[0]
+        share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s %s --list" % \
+              (share_group_script,
+               self.bootstrap_servers(self.security_protocol),
+               command_config)
+        if state is not None:
+            cmd += " --state %s" % state
+        return self.run_cli_tool(node, cmd)
 
     def describe_consumer_group(self, group, node=None, command_config=None):
         """ Describe a consumer group.
@@ -1771,10 +1792,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         output = ""
         self.logger.debug(cmd)
         for line in node.account.ssh_capture(cmd):
-            if not (line.startswith("SLF4J") or line.startswith("TOPIC") or 
line.startswith("Could not fetch offset")):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP") or 
line.startswith("Could not fetch offset")):
                 output += line
         self.logger.debug(output)
         return output
+    
+    def describe_share_group(self, group, node=None, command_config=None):
+        """ Describe a share group.
+        """
+        if node is None:
+            node = self.nodes[0]
+        share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+              (share_group_script,
+               self.bootstrap_servers(self.security_protocol),
+               command_config, group)
+
+        output = ""
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP") or 
line.startswith("Could not fetch offset")):
+                output += line
+        self.logger.debug(output)
+        return output
+    
+    def describe_share_group_members(self, group, node=None, 
command_config=None):
+        """ Describe members of a share group.
+        """
+        if node is None:
+            node = self.nodes[0]
+        share_group_script = self.path.script("kafka-share-groups.sh", node)
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+              (share_group_script,
+               self.bootstrap_servers(self.security_protocol),
+               command_config, group)
+        
+        cmd += " --members"
+
+        output_lines = []
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP") or 
line.strip() == ""):
+                output_lines.append(line.strip())
+        self.logger.debug(output_lines)
+        return output_lines
 
     def describe_quorum(self, node=None):
         """Run the describe quorum command.
diff --git 
a/tests/kafkatest/services/templates/console_share_consumer.properties 
b/tests/kafkatest/services/templates/console_share_consumer.properties
index da9fa4e6664..fccbf980f43 100644
--- a/tests/kafkatest/services/templates/console_share_consumer.properties
+++ b/tests/kafkatest/services/templates/console_share_consumer.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-group.id={{ group_id|default('test-share-consumer-group') }}
+group.id={{ group_id|default('test-share-group') }}
 
 {% if client_id is defined and client_id is not none %}
 client.id={{ client_id }}
diff --git a/tests/kafkatest/tests/core/share_group_command_test.py 
b/tests/kafkatest/tests/core/share_group_command_test.py
new file mode 100644
index 00000000000..83e83ba6449
--- /dev/null
+++ b/tests/kafkatest/tests/core/share_group_command_test.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.console_share_consumer import ConsoleShareConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+import os
+import re
+
+TOPIC = "topic-share-group-command"
+
+
+class ShareGroupCommandTest(Test):
+    """
+    Tests ShareGroupCommand
+    """
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/share_group_command"
+    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
+
+    def __init__(self, test_context):
+        super(ShareGroupCommandTest, self).__init__(test_context)
+        self.num_brokers = 1
+        self.topics = {
+            TOPIC: {'partitions': 1, 'replication-factor': 1}
+        }
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            None, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, 
topics=self.topics,
+            controller_num_nodes_override=self.num_brokers)
+        self.kafka.start()
+
+    def start_share_consumer(self):
+        self.share_consumer = ConsoleShareConsumer(self.test_context, 
num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+                                             share_consumer_timeout_ms=None)
+        self.share_consumer.start()
+
+    def setup_and_verify(self, security_protocol, group=None, 
describe_members=False):
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_share_consumer()
+        share_consumer_node = self.share_consumer.nodes[0]
+        wait_until(lambda: self.share_consumer.alive(share_consumer_node),
+                   timeout_sec=20, backoff_sec=.2, err_msg="Share consumer was 
too slow to start")
+        kafka_node = self.kafka.nodes[0]
+        if security_protocol is not SecurityConfig.PLAINTEXT:
+            prop_file = str(self.kafka.security_config.client_config())
+            self.logger.debug(prop_file)
+            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, 
allow_fail=False)
+            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
+
+        # Verify ShareConsumerGroupCommand lists expected consumer groups
+        command_config_file = self.COMMAND_CONFIG_FILE
+
+        if group:
+            if describe_members:
+                def has_expected_share_group_member():
+                    output = 
self.kafka.describe_share_group_members(group=group, node=kafka_node, 
command_config=command_config_file)
+                    return len(output) == 1 and all("test-share-group" in line 
for line in output)
+                wait_until(has_expected_share_group_member, timeout_sec=10, 
err_msg="Timed out waiting to describe members of the share group.")
+            else:
+                wait_until(lambda: 
re.search("topic-share-group-command",self.kafka.describe_share_group(group=group,
 node=kafka_node, command_config=command_config_file)), timeout_sec=10,
+                        err_msg="Timed out waiting to describe expected share 
groups.")
+        else:
+            wait_until(lambda: "test-share-group" in 
self.kafka.list_share_groups(node=kafka_node, 
command_config=command_config_file), timeout_sec=10,
+                       err_msg="Timed out waiting to list expected share 
groups.")
+
+        self.share_consumer.stop()
+
+    @cluster(num_nodes=3)
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True]
+    )
+    def test_list_share_groups(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+        """
+        Tests if ShareGroupCommand is listing correct share groups
+        :return: None
+        """
+        self.setup_and_verify(security_protocol)
+
+    @cluster(num_nodes=3)
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_describe_share_group(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+        """
+        Tests if ShareGroupCommand is describing a share group correctly
+        :return: None
+        """
+        self.setup_and_verify(security_protocol, group="test-share-group")
+
+    @cluster(num_nodes=3)
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_describe_share_group_members(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.isolated_kraft, use_share_groups=True):
+        """
+        Tests if ShareGroupCommand is describing the members of a share group 
correctly
+        :return: None
+        """
+        self.setup_and_verify(security_protocol, group="test-share-group", 
describe_members=True)

Reply via email to