This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 64b63f472f0 KAFKA-19316: added share_group_command_test.py system tests (#19774) 64b63f472f0 is described below commit 64b63f472f0382761025944105b71b2e2e356d88 Author: Chirag Wadhwa <cwad...@confluent.io> AuthorDate: Thu May 29 15:29:32 2025 +0530 KAFKA-19316: added share_group_command_test.py system tests (#19774) This PR include system tests in the file share_group_command_test.py. These tests test the functionality of kafka-share-groups.sh tool Reviewers: Sushant Mahajan <smaha...@confluent.io>, Andrew Schofield <aschofi...@confluent.io> --- tests/kafkatest/services/console_share_consumer.py | 2 +- tests/kafkatest/services/kafka/kafka.py | 77 ++++++++++++- .../templates/console_share_consumer.properties | 2 +- .../tests/core/share_group_command_test.py | 128 +++++++++++++++++++++ 4 files changed, 206 insertions(+), 3 deletions(-) diff --git a/tests/kafkatest/services/console_share_consumer.py b/tests/kafkatest/services/console_share_consumer.py index 4136511c574..03fbaeaf5a5 100644 --- a/tests/kafkatest/services/console_share_consumer.py +++ b/tests/kafkatest/services/console_share_consumer.py @@ -57,7 +57,7 @@ class ConsoleShareConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadSer "collect_default": False} } - def __init__(self, context, num_nodes, kafka, topic, group_id="test-share-consumer-group", + def __init__(self, context, num_nodes, kafka, topic, group_id="test-share-group", message_validator=None, share_consumer_timeout_ms=None, version=DEV_BRANCH, client_id="console-share-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None, enable_systest_events=False, stop_timeout_sec=35, print_timestamp=False, print_partition=False, diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ca19ca8bd11..b490f75fbac 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1749,6 +1749,27 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): if type is not None: cmd += " --type %s" % type return self.run_cli_tool(node, cmd) + + def list_share_groups(self, node=None, command_config=None, state=None): + """ Get list of share groups. + """ + if node is None: + node = self.nodes[0] + share_group_script = self.path.script("kafka-share-groups.sh", node) + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + cmd = fix_opts_for_new_jvm(node) + cmd += "%s --bootstrap-server %s %s --list" % \ + (share_group_script, + self.bootstrap_servers(self.security_protocol), + command_config) + if state is not None: + cmd += " --state %s" % state + return self.run_cli_tool(node, cmd) def describe_consumer_group(self, group, node=None, command_config=None): """ Describe a consumer group. @@ -1771,10 +1792,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): output = "" self.logger.debug(cmd) for line in node.account.ssh_capture(cmd): - if not (line.startswith("SLF4J") or line.startswith("TOPIC") or line.startswith("Could not fetch offset")): + if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.startswith("Could not fetch offset")): output += line self.logger.debug(output) return output + + def describe_share_group(self, group, node=None, command_config=None): + """ Describe a share group. + """ + if node is None: + node = self.nodes[0] + share_group_script = self.path.script("kafka-share-groups.sh", node) + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + cmd = fix_opts_for_new_jvm(node) + cmd += "%s --bootstrap-server %s %s --group %s --describe" % \ + (share_group_script, + self.bootstrap_servers(self.security_protocol), + command_config, group) + + output = "" + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.startswith("Could not fetch offset")): + output += line + self.logger.debug(output) + return output + + def describe_share_group_members(self, group, node=None, command_config=None): + """ Describe members of a share group. + """ + if node is None: + node = self.nodes[0] + share_group_script = self.path.script("kafka-share-groups.sh", node) + + if command_config is None: + command_config = "" + else: + command_config = "--command-config " + command_config + + cmd = fix_opts_for_new_jvm(node) + cmd += "%s --bootstrap-server %s %s --group %s --describe" % \ + (share_group_script, + self.bootstrap_servers(self.security_protocol), + command_config, group) + + cmd += " --members" + + output_lines = [] + self.logger.debug(cmd) + for line in node.account.ssh_capture(cmd): + if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.strip() == ""): + output_lines.append(line.strip()) + self.logger.debug(output_lines) + return output_lines def describe_quorum(self, node=None): """Run the describe quorum command. diff --git a/tests/kafkatest/services/templates/console_share_consumer.properties b/tests/kafkatest/services/templates/console_share_consumer.properties index da9fa4e6664..fccbf980f43 100644 --- a/tests/kafkatest/services/templates/console_share_consumer.properties +++ b/tests/kafkatest/services/templates/console_share_consumer.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -group.id={{ group_id|default('test-share-consumer-group') }} +group.id={{ group_id|default('test-share-group') }} {% if client_id is defined and client_id is not none %} client.id={{ client_id }} diff --git a/tests/kafkatest/tests/core/share_group_command_test.py b/tests/kafkatest/tests/core/share_group_command_test.py new file mode 100644 index 00000000000..83e83ba6449 --- /dev/null +++ b/tests/kafkatest/tests/core/share_group_command_test.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.utils.util import wait_until +from ducktape.tests.test import Test +from ducktape.mark import matrix +from ducktape.mark.resource import cluster + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.console_share_consumer import ConsoleShareConsumer +from kafkatest.services.security.security_config import SecurityConfig + +import os +import re + +TOPIC = "topic-share-group-command" + + +class ShareGroupCommandTest(Test): + """ + Tests ShareGroupCommand + """ + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/share_group_command" + COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties") + + def __init__(self, test_context): + super(ShareGroupCommandTest, self).__init__(test_context) + self.num_brokers = 1 + self.topics = { + TOPIC: {'partitions': 1, 'replication-factor': 1} + } + + def start_kafka(self, security_protocol, interbroker_security_protocol): + self.kafka = KafkaService( + self.test_context, self.num_brokers, + None, security_protocol=security_protocol, + interbroker_security_protocol=interbroker_security_protocol, topics=self.topics, + controller_num_nodes_override=self.num_brokers) + self.kafka.start() + + def start_share_consumer(self): + self.share_consumer = ConsoleShareConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, + share_consumer_timeout_ms=None) + self.share_consumer.start() + + def setup_and_verify(self, security_protocol, group=None, describe_members=False): + self.start_kafka(security_protocol, security_protocol) + self.start_share_consumer() + share_consumer_node = self.share_consumer.nodes[0] + wait_until(lambda: self.share_consumer.alive(share_consumer_node), + timeout_sec=20, backoff_sec=.2, err_msg="Share consumer was too slow to start") + kafka_node = self.kafka.nodes[0] + if security_protocol is not SecurityConfig.PLAINTEXT: + prop_file = str(self.kafka.security_config.client_config()) + self.logger.debug(prop_file) + kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file) + + # Verify ShareConsumerGroupCommand lists expected consumer groups + command_config_file = self.COMMAND_CONFIG_FILE + + if group: + if describe_members: + def has_expected_share_group_member(): + output = self.kafka.describe_share_group_members(group=group, node=kafka_node, command_config=command_config_file) + return len(output) == 1 and all("test-share-group" in line for line in output) + wait_until(has_expected_share_group_member, timeout_sec=10, err_msg="Timed out waiting to describe members of the share group.") + else: + wait_until(lambda: re.search("topic-share-group-command",self.kafka.describe_share_group(group=group, node=kafka_node, command_config=command_config_file)), timeout_sec=10, + err_msg="Timed out waiting to describe expected share groups.") + else: + wait_until(lambda: "test-share-group" in self.kafka.list_share_groups(node=kafka_node, command_config=command_config_file), timeout_sec=10, + err_msg="Timed out waiting to list expected share groups.") + + self.share_consumer.stop() + + @cluster(num_nodes=3) + @matrix( + security_protocol=['PLAINTEXT', 'SSL'], + metadata_quorum=[quorum.isolated_kraft], + use_share_groups=[True] + ) + def test_list_share_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True): + """ + Tests if ShareGroupCommand is listing correct share groups + :return: None + """ + self.setup_and_verify(security_protocol) + + @cluster(num_nodes=3) + @matrix( + security_protocol=['PLAINTEXT', 'SSL'], + metadata_quorum=[quorum.isolated_kraft], + use_share_groups=[True], + ) + def test_describe_share_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True): + """ + Tests if ShareGroupCommand is describing a share group correctly + :return: None + """ + self.setup_and_verify(security_protocol, group="test-share-group") + + @cluster(num_nodes=3) + @matrix( + security_protocol=['PLAINTEXT', 'SSL'], + metadata_quorum=[quorum.isolated_kraft], + use_share_groups=[True], + ) + def test_describe_share_group_members(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.isolated_kraft, use_share_groups=True): + """ + Tests if ShareGroupCommand is describing the members of a share group correctly + :return: None + """ + self.setup_and_verify(security_protocol, group="test-share-group", describe_members=True)