This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1f41ef0112 KAFKA-19320: Added share_consume_bench_test.py system
tests (#19811)
d1f41ef0112 is described below
commit d1f41ef0112aa1d1d8b3006d9f87bd0b7b10bf97
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Jun 2 20:08:56 2025 +0530
KAFKA-19320: Added share_consume_bench_test.py system tests (#19811)
This PR adds system tests in share_consume_bench_test.py for testing the
trogdor agent for Share Consumers/
Reviewers: Lan Ding <[email protected]>, Andrew
Schofield <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 23 ++-
.../trogdor/share_consume_bench_workload.py | 56 +++++++
.../tests/core/share_consume_bench_test.py | 177 +++++++++++++++++++++
3 files changed, 253 insertions(+), 3 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 88f7fc24c66..25249a7ab22 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1788,13 +1788,30 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
cmd = fix_opts_for_new_jvm(node)
cmd += "%s --bootstrap-server %s %s --list" % \
- (share_group_script,
- self.bootstrap_servers(self.security_protocol),
- command_config)
+ (share_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ command_config)
if state is not None:
cmd += " --state %s" % state
return self.run_cli_tool(node, cmd)
+ def set_share_group_offset_reset_strategy(self, group, strategy=None,
node=None):
+ """ Set the offset reset strategy config for the given group.
+ """
+ if strategy is None:
+ return
+ if node is None:
+ node = self.nodes[0]
+ consumer_group_script = self.path.script("kafka-configs.sh", node)
+
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%s --bootstrap-server %s --group %s --alter --add-config
\"share.auto.offset.reset=%s\"" % \
+ (consumer_group_script,
+ self.bootstrap_servers(self.security_protocol),
+ group,
+ strategy)
+ return "Completed" in self.run_cli_tool(node, cmd)
+
def describe_consumer_group(self, group, node=None, command_config=None):
""" Describe a consumer group.
"""
diff --git a/tests/kafkatest/services/trogdor/share_consume_bench_workload.py
b/tests/kafkatest/services/trogdor/share_consume_bench_workload.py
new file mode 100644
index 00000000000..292d7b70e7e
--- /dev/null
+++ b/tests/kafkatest/services/trogdor/share_consume_bench_workload.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.services.service import Service
+from kafkatest.services.trogdor.task_spec import TaskSpec
+
+
+class ShareConsumeBenchWorkloadSpec(TaskSpec):
+ def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
+ target_messages_per_sec, max_messages, active_topics,
+ consumer_conf, common_client_conf, admin_client_conf,
share_group=None, threads_per_worker=1):
+ super(ShareConsumeBenchWorkloadSpec, self).__init__(start_ms,
duration_ms)
+ self.message["class"] =
"org.apache.kafka.trogdor.workload.ShareConsumeBenchSpec"
+ self.message["consumerNode"] = consumer_node
+ self.message["bootstrapServers"] = bootstrap_servers
+ self.message["targetMessagesPerSec"] = target_messages_per_sec
+ self.message["maxMessages"] = max_messages
+ self.message["consumerConf"] = consumer_conf
+ self.message["adminClientConf"] = admin_client_conf
+ self.message["commonClientConf"] = common_client_conf
+ self.message["activeTopics"] = active_topics
+ self.message["threadsPerWorker"] = threads_per_worker
+ if share_group is not None:
+ self.message["shareGroup"] = share_group
+
+
+class ShareConsumeBenchWorkloadService(Service):
+ def __init__(self, context, kafka):
+ Service.__init__(self, context, num_nodes=1)
+ self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
+ self.share_consumer_node = self.nodes[0].account.hostname
+
+ def free(self):
+ Service.free(self)
+
+ def wait_node(self, node, timeout_sec=None):
+ pass
+
+ def stop_node(self, node):
+ pass
+
+ def clean_node(self, node):
+ pass
\ No newline at end of file
diff --git a/tests/kafkatest/tests/core/share_consume_bench_test.py
b/tests/kafkatest/tests/core/share_consume_bench_test.py
new file mode 100644
index 00000000000..bc4027d6b0c
--- /dev/null
+++ b/tests/kafkatest/tests/core/share_consume_bench_test.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.utils.util import wait_until
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.trogdor.produce_bench_workload import
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.share_consume_bench_workload import
ShareConsumeBenchWorkloadService, ShareConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+
+
+class ShareConsumeBenchTest(Test):
+ def __init__(self, test_context):
+ """:type test_context: ducktape.tests.test.TestContext"""
+ super(ShareConsumeBenchTest, self).__init__(test_context)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
+ self.producer_workload_service =
ProduceBenchWorkloadService(test_context, self.kafka)
+ self.share_consumer_workload_service =
ShareConsumeBenchWorkloadService(test_context, self.kafka)
+ self.topics_with_multiple_partitions =
{"share_consume_bench_topic[0-5]": {"numPartitions": 5, "replicationFactor": 3}}
+ self.topic_with_single_partitions = {"share_consume_bench_topic6":
{"numPartitions": 1, "replicationFactor": 3}}
+ self.trogdor = TrogdorService(context=self.test_context,
+ client_services=[self.kafka,
self.producer_workload_service,
+
self.share_consumer_workload_service])
+ self.share_group="share-group"
+
+ def setUp(self):
+ self.trogdor.start()
+ self.kafka.start()
+
+ def teardown(self):
+ self.trogdor.stop()
+ self.kafka.stop()
+
+ def produce_messages(self, topics, max_messages=10000):
+ produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+
self.producer_workload_service.producer_node,
+
self.producer_workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=max_messages,
+ producer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+ inactive_topics={},
+ active_topics=topics)
+ produce_workload = self.trogdor.create_task("produce_workload",
produce_spec)
+ produce_workload.wait_for_done(timeout_sec=180)
+ self.logger.debug("Produce workload finished")
+
+ @cluster(num_nodes=10)
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_share_consume_bench(self, metadata_quorum, use_share_groups=True):
+ """
+ Runs a ShareConsumeBench workload to consume messages
+ """
+ self.produce_messages(self.topics_with_multiple_partitions)
+ share_consume_spec = ShareConsumeBenchWorkloadSpec(0,
TaskSpec.MAX_DURATION_MS,
+
self.share_consumer_workload_service.share_consumer_node,
+
self.share_consumer_workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=10000,
+ consumer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+
active_topics=["share_consume_bench_topic[0-5]"],
+ share_group=self.share_group)
+ wait_until(lambda:
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group,
strategy="earliest"),
+ timeout_sec=20, backoff_sec=2,
err_msg="share.auto.offset.reset not set to earliest")
+ share_consume_workload =
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+ share_consume_workload.wait_for_done(timeout_sec=360)
+ self.logger.debug("Share consume workload finished")
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True,
indent=2))
+
+ @cluster(num_nodes=10)
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_two_share_consumers_in_a_group_topics(self, metadata_quorum,
use_share_groups=True):
+ """
+ Runs two share consumers in the same share group to read messages from
topics.
+ """
+ self.produce_messages(self.topics_with_multiple_partitions)
+ share_consume_spec = ShareConsumeBenchWorkloadSpec(0,
TaskSpec.MAX_DURATION_MS,
+
self.share_consumer_workload_service.share_consumer_node,
+
self.share_consumer_workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=2000, # both
should read at least 2k messages
+ consumer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+ threads_per_worker=2,
+
active_topics=["share_consume_bench_topic[0-5]"],
+ share_group=self.share_group)
+ wait_until(lambda:
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group,
strategy="earliest"),
+ timeout_sec=20, backoff_sec=2,
err_msg="share.auto.offset.reset not set to earliest")
+ share_consume_workload =
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+ share_consume_workload.wait_for_done(timeout_sec=360)
+ self.logger.debug("Share consume workload finished")
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True,
indent=2))
+
+ @cluster(num_nodes=10)
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_one_share_consumer_subscribed_to_single_topic(self,
metadata_quorum, use_share_groups=True):
+ """
+ Runs one share consumers in a share group to read messages from topic
with single partition.
+ """
+ self.produce_messages(self.topic_with_single_partitions)
+ share_consume_spec = ShareConsumeBenchWorkloadSpec(0,
TaskSpec.MAX_DURATION_MS,
+
self.share_consumer_workload_service.share_consumer_node,
+
self.share_consumer_workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=10000,
+ consumer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+
active_topics=["share_consume_bench_topic6"],
+ share_group=self.share_group)
+ wait_until(lambda:
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group,
strategy="earliest"),
+ timeout_sec=20, backoff_sec=2,
err_msg="share.auto.offset.reset not set to earliest")
+ share_consume_workload =
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+ share_consume_workload.wait_for_done(timeout_sec=360)
+ self.logger.debug("Share consume workload finished")
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True,
indent=2))
+
+ @cluster(num_nodes=10)
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_share_groups=[True],
+ )
+ def test_multiple_share_consumers_subscribed_to_single_topic(self,
metadata_quorum, use_share_groups=True):
+ """
+ Runs multiple share consumers in a share group to read messages from
topic with single partition.
+ """
+ self.produce_messages(self.topic_with_single_partitions)
+ share_consume_spec = ShareConsumeBenchWorkloadSpec(0,
TaskSpec.MAX_DURATION_MS,
+
self.share_consumer_workload_service.share_consumer_node,
+
self.share_consumer_workload_service.bootstrap_servers,
+ target_messages_per_sec=1000,
+ max_messages=100, # all should
read at least 100 messages
+ consumer_conf={},
+ admin_client_conf={},
+ common_client_conf={},
+ threads_per_worker=5,
+
active_topics=["share_consume_bench_topic6"],
+ share_group=self.share_group)
+ wait_until(lambda:
self.kafka.set_share_group_offset_reset_strategy(group=self.share_group,
strategy="earliest"),
+ timeout_sec=20, backoff_sec=2,
err_msg="share.auto.offset.reset not set to earliest")
+ share_consume_workload =
self.trogdor.create_task("share_consume_workload", share_consume_spec)
+ share_consume_workload.wait_for_done(timeout_sec=360)
+ self.logger.debug("Share consume workload finished")
+ tasks = self.trogdor.tasks()
+ self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True,
indent=2))
\ No newline at end of file