This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1f41ef0112 KAFKA-19320: Added share_consume_bench_test.py system 
tests (#19811)
d1f41ef0112 is described below

commit d1f41ef0112aa1d1d8b3006d9f87bd0b7b10bf97
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Jun 2 20:08:56 2025 +0530

    KAFKA-19320: Added share_consume_bench_test.py system tests (#19811)
    
    This PR adds system tests in share_consume_bench_test.py for testing the
    trogdor agent for Share Consumers/
    
    Reviewers: Lan Ding <[email protected]>, Andrew
     Schofield <[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py            |  23 ++-
 .../trogdor/share_consume_bench_workload.py        |  56 +++++++
 .../tests/core/share_consume_bench_test.py         | 177 +++++++++++++++++++++
 3 files changed, 253 insertions(+), 3 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 88f7fc24c66..25249a7ab22 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1788,13 +1788,30 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
         cmd = fix_opts_for_new_jvm(node)
         cmd += "%s --bootstrap-server %s %s --list" % \
-              (share_group_script,
-               self.bootstrap_servers(self.security_protocol),
-               command_config)
+               (share_group_script,
+                self.bootstrap_servers(self.security_protocol),
+                command_config)
         if state is not None:
             cmd += " --state %s" % state
         return self.run_cli_tool(node, cmd)
 
+    def set_share_group_offset_reset_strategy(self, group, strategy=None, 
node=None):
+        """ Set the offset reset strategy config for the given group.
+        """
+        if strategy is None:
+            return
+        if node is None:
+            node = self.nodes[0]
+        consumer_group_script = self.path.script("kafka-configs.sh", node)
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s --group %s --alter --add-config 
\"share.auto.offset.reset=%s\"" % \
+              (consumer_group_script,
+               self.bootstrap_servers(self.security_protocol),
+               group,
+               strategy)
+        return "Completed" in self.run_cli_tool(node, cmd)
+
     def describe_consumer_group(self, group, node=None, command_config=None):
         """ Describe a consumer group.
         """
diff --git a/tests/kafkatest/services/trogdor/share_consume_bench_workload.py 
b/tests/kafkatest/services/trogdor/share_consume_bench_workload.py
new file mode 100644
index 00000000000..292d7b70e7e
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/share_consume_bench_workload.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ShareConsumeBenchWorkloadSpec(TaskSpec):
+    def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
+                 target_messages_per_sec, max_messages, active_topics,
+                 consumer_conf, common_client_conf, admin_client_conf, 
share_group=None, threads_per_worker=1):
+        super(ShareConsumeBenchWorkloadSpec, self).__init__(start_ms, 
duration_ms)
+        self.message["class"] = 
"org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec"
+        self.message["consumerNode"] = consumer_node
+        self.message["bootstrapServers"] = bootstrap_servers
+        self.message["targetMessagesPerSec"] = target_messages_per_sec
+        self.message["maxMessages"] = max_messages
+        self.message["consumerConf"] = consumer_conf
+        self.message["adminClientConf"] = admin_client_conf
+        self.message["commonClientConf"] = common_client_conf
+        self.message["activeTopics"] = active_topics
+        self.message["threadsPerWorker"] = threads_per_worker
+        if share_group is not None:
+            self.message["shareGroup"] = share_group
+
+
+class ShareConsumeBenchWorkloadService(Service):
+    def __init__(self, context, kafka):
+        Service.__init__(self, context, num_nodes=1)
+        self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+        self.share_consumer_node = self.nodes[0].account.hostname
+
+    def free(self):
+        Service.free(self)
+
+    def wait_node(self, node, timeout_sec=None):
+        pass
+
+    def stop_node(self, node):
+        pass
+
+    def clean_node(self, node):
+        pass
\ No newline at end of file
diff --git a/tests/kafkatest/tests/core/share_consume_bench_test.py 
b/tests/kafkatest/tests/core/share_consume_bench_test.py
new file mode 100644
index 00000000000..bc4027d6b0c
--- /dev/null
+++ b/tests/kafkatest/tests/core/share_consume_bench_test.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.share_consume_bench_workload import 
ShareConsumeBenchWorkloadService, ShareConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+
+
+class ShareConsumeBenchTest(Test):
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ShareConsumeBenchTest, self).__init__(test_context)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
+        self.producer_workload_service = 
ProduceBenchWorkloadService(test_context, self.kafka)
+        self.share_consumer_workload_service = 
ShareConsumeBenchWorkloadService(test_context, self.kafka)
+        self.topics_with_multiple_partitions = 
{"share_consume_bench_topic[0-5]": {"numPartitions": 5, "replicationFactor": 3}}
+        self.topic_with_single_partitions = {"share_consume_bench_topic6": 
{"numPartitions": 1, "replicationFactor": 3}}
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.kafka, 
self.producer_workload_service,
+                                                       
self.share_consumer_workload_service])
+        self.share_group="share-group"
+
+    def setUp(self):
+        self.trogdor.start()
+        self.kafka.start()
+
+    def teardown(self):
+        self.trogdor.stop()
+        self.kafka.stop()
+
+    def produce_messages(self, topics, max_messages=10000):
+        produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.producer_workload_service.producer_node,
+                                                
self.producer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=max_messages,
+                                                producer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                inactive_topics={},
+                                                active_topics=topics)
+        produce_workload = self.trogdor.create_task("produce_workload", 
produce_spec)
+        produce_workload.wait_for_done(timeout_sec=180)
+        self.logger.debug("Produce workload finished")
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_share_consume_bench(self, metadata_quorum, use_share_groups=True):
+        """
+        Runs a ShareConsumeBench workload to consume messages
+        """
+        self.produce_messages(self.topics_with_multiple_partitions)
+        share_consume_spec = ShareConsumeBenchWorkloadSpec(0, 
TaskSpec.MAX_DURATION_MS,
+                                                
self.share_consumer_workload_service.share_consumer_node,
+                                                
self.share_consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=10000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                
active_topics=["share_consume_bench_topic[0-5]"],
+                                                share_group=self.share_group)
+        wait_until(lambda: 
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group, 
strategy="earliest"),
+                   timeout_sec=20, backoff_sec=2, 
err_msg="share.auto.offset.reset not set to earliest")
+        share_consume_workload = 
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+        share_consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Share consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_two_share_consumers_in_a_group_topics(self, metadata_quorum, 
use_share_groups=True):
+        """
+        Runs two share consumers in the same share group to read messages from 
topics.
+        """
+        self.produce_messages(self.topics_with_multiple_partitions)
+        share_consume_spec = ShareConsumeBenchWorkloadSpec(0, 
TaskSpec.MAX_DURATION_MS,
+                                                
self.share_consumer_workload_service.share_consumer_node,
+                                                
self.share_consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2000, # both 
should read at least 2k messages
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                threads_per_worker=2,
+                                                
active_topics=["share_consume_bench_topic[0-5]"],
+                                                share_group=self.share_group)
+        wait_until(lambda: 
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group, 
strategy="earliest"),
+                   timeout_sec=20, backoff_sec=2, 
err_msg="share.auto.offset.reset not set to earliest")
+        share_consume_workload = 
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+        share_consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Share consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_one_share_consumer_subscribed_to_single_topic(self, 
metadata_quorum, use_share_groups=True):
+        """
+        Runs one share consumers in a share group to read messages from topic 
with single partition.
+        """
+        self.produce_messages(self.topic_with_single_partitions)
+        share_consume_spec = ShareConsumeBenchWorkloadSpec(0, 
TaskSpec.MAX_DURATION_MS,
+                                                
self.share_consumer_workload_service.share_consumer_node,
+                                                
self.share_consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=10000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                
active_topics=["share_consume_bench_topic6"],
+                                                share_group=self.share_group)
+        wait_until(lambda: 
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group, 
strategy="earliest"),
+                   timeout_sec=20, backoff_sec=2, 
err_msg="share.auto.offset.reset not set to earliest")
+        share_consume_workload = 
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+        share_consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Share consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    @cluster(num_nodes=10)
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_share_groups=[True],
+    )
+    def test_multiple_share_consumers_subscribed_to_single_topic(self, 
metadata_quorum, use_share_groups=True):
+        """
+        Runs multiple share consumers in a share group to read messages from 
topic with single partition.
+        """
+        self.produce_messages(self.topic_with_single_partitions)
+        share_consume_spec = ShareConsumeBenchWorkloadSpec(0, 
TaskSpec.MAX_DURATION_MS,
+                                                
self.share_consumer_workload_service.share_consumer_node,
+                                                
self.share_consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=100, # all should 
read at least 100 messages
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                threads_per_worker=5,
+                                                
active_topics=["share_consume_bench_topic6"],
+                                                share_group=self.share_group)
+        wait_until(lambda: 
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group, 
strategy="earliest"),
+                   timeout_sec=20, backoff_sec=2, 
err_msg="share.auto.offset.reset not set to earliest")
+        share_consume_workload = 
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+        share_consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Share consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
\ No newline at end of file

Reply via email to