This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bc7b87001bf KAFKA-18676; Update Benchmark system tests (#18785)
bc7b87001bf is described below
commit bc7b87001bfd641f93105eaa1e0326bdaf15b55d
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Feb 3 21:42:22 2025 +0800
KAFKA-18676; Update Benchmark system tests (#18785)
Update `benchmark_test.py` to use KRaft.
```
> TC_PATHS="tests/kafkatest/benchmarks/core/benchmark_test.py" /bin/bash
tests/docker/run_tests.sh
================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id: 2025-02-03--001
run time: 96 minutes 48.900 seconds
tests run: 120
passed: 120
flaky: 0
failed: 0
ignored: 0
================================================================================
```
Reviewers: David Jacot <[email protected]>
---
tests/kafkatest/benchmarks/core/benchmark_test.py | 77 +++++++++++------------
1 file changed, 37 insertions(+), 40 deletions(-)
diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 321ba6e8be8..f66f759d46c 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -19,9 +19,8 @@ from ducktape.mark.resource import cluster
from ducktape.services.service import Service
from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.performance import ProducerPerformanceService,
EndToEndLatencyService, ConsumerPerformanceService, throughput, latency,
compute_aggregate_throughput
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import DEV_BRANCH, KafkaVersion
TOPIC_REP_ONE = "topic-replication-factor-one"
@@ -36,15 +35,12 @@ class Benchmark(Test):
"""
def __init__(self, test_context):
super(Benchmark, self).__init__(test_context)
- self.num_zk = 1
self.num_brokers = 3
self.topics = {
TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
}
- self.zk = ZookeeperService(test_context, self.num_zk)
-
self.msgs_large = 10000000
self.batch_size = 8*1024
self.buffer_memory = 64*1024*1024
@@ -52,31 +48,29 @@ class Benchmark(Test):
self.target_data_size = 128*1024*1024
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
- def setUp(self):
- self.zk.start()
-
def start_kafka(self, security_protocol, interbroker_security_protocol,
version, tls_version=None):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
+ zk=None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol,
topics=self.topics,
version=version, tls_version=tls_version)
self.kafka.log_level = "INFO" # We don't DEBUG logging here
self.kafka.start()
- @cluster(num_nodes=5)
- @parametrize(acks=1, topic=TOPIC_REP_ONE)
- @parametrize(acks=1, topic=TOPIC_REP_THREE)
- @parametrize(acks=-1, topic=TOPIC_REP_THREE)
- @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000,
10000, 100000], compression_type=["none", "snappy"], security_protocol=['SSL'],
tls_version=['TLSv1.2', 'TLSv1.3'])
- @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000,
10000, 100000], compression_type=["none", "snappy"],
security_protocol=['PLAINTEXT'])
- @cluster(num_nodes=7)
- @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
+ @cluster(num_nodes=9)
+ @parametrize(acks=1, topic=TOPIC_REP_ONE,
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE,
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(acks=-1, topic=TOPIC_REP_THREE,
metadata_quorum=quorum.isolated_kraft)
+ @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000,
10000, 100000],
+ compression_type=["none", "snappy"], security_protocol=['SSL'],
tls_version=['TLSv1.2', 'TLSv1.3'], metadata_quorum=quorum.isolated_kraft)
+ @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000,
10000, 100000],
+ compression_type=["none", "snappy"],
security_protocol=['PLAINTEXT'], metadata_quorum=quorum.isolated_kraft)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3,
metadata_quorum=quorum.isolated_kraft)
def test_producer_throughput(self, acks, topic, num_producers=1,
message_size=DEFAULT_RECORD_SIZE,
compression_type="none",
security_protocol='PLAINTEXT', tls_version=None, client_version=str(DEV_BRANCH),
- broker_version=str(DEV_BRANCH)):
+ broker_version=str(DEV_BRANCH),
metadata_quorum=quorum.isolated_kraft):
"""
- Setup: 1 node zk + 3 node kafka cluster
+ Setup: 3 node kafka cluster
Produce ~128MB worth of messages to a topic with 6 partitions.
Required acks, topic replication factor,
security protocol and message size are varied depending on arguments
injected into this test.
@@ -101,15 +95,16 @@ class Benchmark(Test):
self.producer.run()
return compute_aggregate_throughput(self.producer)
- @cluster(num_nodes=5)
- @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2',
'TLSv1.3'], compression_type=["none", "snappy"])
- @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"])
+ @cluster(num_nodes=7)
+ @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+ compression_type=["none", "snappy"],
metadata_quorum=quorum.isolated_kraft)
+ @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"], metadata_quorum=quorum.isolated_kraft)
def test_long_term_producer_throughput(self, compression_type="none",
security_protocol='PLAINTEXT',
tls_version=None,
interbroker_security_protocol=None,
client_version=str(DEV_BRANCH),
- broker_version=str(DEV_BRANCH)):
+ broker_version=str(DEV_BRANCH),
metadata_quorum=quorum.isolated_kraft):
"""
- Setup: 1 node zk + 3 node kafka cluster
+ Setup: 3 node kafka cluster
Produce 10e6 100 byte messages to a topic with 6 partitions,
replication-factor 3, and acks=1.
Collect and return aggregate throughput statistics after all messages
have been acknowledged.
@@ -158,16 +153,16 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
- @cluster(num_nodes=5)
- @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2',
'TLSv1.3'], compression_type=["none", "snappy"])
- @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"])
- @cluster(num_nodes=6)
- @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'],
compression_type=["none", "snappy"])
+ @cluster(num_nodes=8)
+ @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+ compression_type=["none", "snappy"],
metadata_quorum=quorum.isolated_kraft)
+ @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"], metadata_quorum=quorum.isolated_kraft)
+ @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'],
compression_type=["none", "snappy"], metadata_quorum=quorum.isolated_kraft)
def test_end_to_end_latency(self, compression_type="none",
security_protocol="PLAINTEXT", tls_version=None,
interbroker_security_protocol=None,
client_version=str(DEV_BRANCH),
- broker_version=str(DEV_BRANCH)):
+ broker_version=str(DEV_BRANCH),
metadata_quorum=quorum.isolated_kraft):
"""
- Setup: 1 node zk + 3 node kafka cluster
+ Setup: 3 node kafka cluster
Produce (acks = 1) and consume 10e3 messages to a topic with 6
partitions and replication-factor 3,
measuring the latency between production and consumption of each
message.
@@ -190,14 +185,15 @@ class Benchmark(Test):
self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'],
self.perf.results[0]['latency_99th_ms'],
self.perf.results[0]['latency_999th_ms'])
- @cluster(num_nodes=6)
- @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2',
'TLSv1.3'], compression_type=["none", "snappy"])
- @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"])
+ @cluster(num_nodes=8)
+ @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+ compression_type=["none", "snappy"],
metadata_quorum=quorum.isolated_kraft)
+ @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"], metadata_quorum=quorum.isolated_kraft)
def test_producer_and_consumer(self, compression_type="none",
security_protocol="PLAINTEXT", tls_version=None,
interbroker_security_protocol=None,
- client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH)):
+ client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft):
"""
- Setup: 1 node zk + 3 node kafka cluster
+ Setup: 3 node kafka cluster
Concurrently produce and consume 10e6 messages with a single producer
and a single consumer,
Return aggregate throughput statistics for both producer and consumer.
@@ -237,12 +233,13 @@ class Benchmark(Test):
self.logger.info("\n".join(summary))
return data
- @cluster(num_nodes=6)
- @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2',
'TLSv1.3'], compression_type=["none", "snappy"])
- @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"])
+ @cluster(num_nodes=8)
+ @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+ compression_type=["none", "snappy"],
metadata_quorum=quorum.isolated_kraft)
+ @matrix(security_protocol=['PLAINTEXT'], compression_type=["none",
"snappy"], metadata_quorum=quorum.isolated_kraft)
def test_consumer_throughput(self, compression_type="none",
security_protocol="PLAINTEXT", tls_version=None,
interbroker_security_protocol=None,
num_consumers=1,
- client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH)):
+ client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft):
"""
Consume 10e6 100-byte messages with 1 or more consumers from a topic
with 6 partitions
and report throughput.