This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc7b87001bf KAFKA-18676; Update Benchmark system tests (#18785)
bc7b87001bf is described below

commit bc7b87001bfd641f93105eaa1e0326bdaf15b55d
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Feb 3 21:42:22 2025 +0800

    KAFKA-18676; Update Benchmark system tests (#18785)
    
    Update `benchmark_test.py` to use KRaft.
    
    ```
    > TC_PATHS="tests/kafkatest/benchmarks/core/benchmark_test.py" /bin/bash 
tests/docker/run_tests.sh
    
    
================================================================================
    SESSION REPORT (ALL TESTS)
    ducktape version: 0.12.0
    session_id:       2025-02-03--001
    run time:         96 minutes 48.900 seconds
    tests run:        120
    passed:           120
    flaky:            0
    failed:           0
    ignored:          0
    
================================================================================
    ```
    
    Reviewers: David Jacot <[email protected]>
---
 tests/kafkatest/benchmarks/core/benchmark_test.py | 77 +++++++++++------------
 1 file changed, 37 insertions(+), 40 deletions(-)

diff --git a/tests/kafkatest/benchmarks/core/benchmark_test.py 
b/tests/kafkatest/benchmarks/core/benchmark_test.py
index 321ba6e8be8..f66f759d46c 100644
--- a/tests/kafkatest/benchmarks/core/benchmark_test.py
+++ b/tests/kafkatest/benchmarks/core/benchmark_test.py
@@ -19,9 +19,8 @@ from ducktape.mark.resource import cluster
 from ducktape.services.service import Service
 from ducktape.tests.test import Test
 
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.performance import ProducerPerformanceService, 
EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, 
compute_aggregate_throughput
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.version import DEV_BRANCH, KafkaVersion
 
 TOPIC_REP_ONE = "topic-replication-factor-one"
@@ -36,15 +35,12 @@ class Benchmark(Test):
     """
     def __init__(self, test_context):
         super(Benchmark, self).__init__(test_context)
-        self.num_zk = 1
         self.num_brokers = 3
         self.topics = {
             TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
             TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk)
-
         self.msgs_large = 10000000
         self.batch_size = 8*1024
         self.buffer_memory = 64*1024*1024
@@ -52,31 +48,29 @@ class Benchmark(Test):
         self.target_data_size = 128*1024*1024
         self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
 
-    def setUp(self):
-        self.zk.start()
-
     def start_kafka(self, security_protocol, interbroker_security_protocol, 
version, tls_version=None):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
+            zk=None, security_protocol=security_protocol,
             interbroker_security_protocol=interbroker_security_protocol, 
topics=self.topics,
             version=version, tls_version=tls_version)
         self.kafka.log_level = "INFO"  # We don't DEBUG logging here
         self.kafka.start()
 
-    @cluster(num_nodes=5)
-    @parametrize(acks=1, topic=TOPIC_REP_ONE)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE)
-    @parametrize(acks=-1, topic=TOPIC_REP_THREE)
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000], compression_type=["none", "snappy"], security_protocol=['SSL'], 
tls_version=['TLSv1.2', 'TLSv1.3'])
-    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000], compression_type=["none", "snappy"], 
security_protocol=['PLAINTEXT'])
-    @cluster(num_nodes=7)
-    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
+    @cluster(num_nodes=9)
+    @parametrize(acks=1, topic=TOPIC_REP_ONE, 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(acks=-1, topic=TOPIC_REP_THREE, 
metadata_quorum=quorum.isolated_kraft)
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000],
+            compression_type=["none", "snappy"], security_protocol=['SSL'], 
tls_version=['TLSv1.2', 'TLSv1.3'], metadata_quorum=quorum.isolated_kraft)
+    @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[10, 100, 1000, 
10000, 100000],
+            compression_type=["none", "snappy"], 
security_protocol=['PLAINTEXT'], metadata_quorum=quorum.isolated_kraft)
+    @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, 
metadata_quorum=quorum.isolated_kraft)
     def test_producer_throughput(self, acks, topic, num_producers=1, 
message_size=DEFAULT_RECORD_SIZE,
                                  compression_type="none", 
security_protocol='PLAINTEXT', tls_version=None, client_version=str(DEV_BRANCH),
-                                 broker_version=str(DEV_BRANCH)):
+                                 broker_version=str(DEV_BRANCH), 
metadata_quorum=quorum.isolated_kraft):
         """
-        Setup: 1 node zk + 3 node kafka cluster
+        Setup: 3 node kafka cluster
         Produce ~128MB worth of messages to a topic with 6 partitions. 
Required acks, topic replication factor,
         security protocol and message size are varied depending on arguments 
injected into this test.
 
@@ -101,15 +95,16 @@ class Benchmark(Test):
         self.producer.run()
         return compute_aggregate_throughput(self.producer)
 
-    @cluster(num_nodes=5)
-    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 
'TLSv1.3'], compression_type=["none", "snappy"])
-    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"])
+    @cluster(num_nodes=7)
+    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+            compression_type=["none", "snappy"], 
metadata_quorum=quorum.isolated_kraft)
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"], metadata_quorum=quorum.isolated_kraft)
     def test_long_term_producer_throughput(self, compression_type="none",
                                            security_protocol='PLAINTEXT', 
tls_version=None,
                                            interbroker_security_protocol=None, 
client_version=str(DEV_BRANCH),
-                                           broker_version=str(DEV_BRANCH)):
+                                           broker_version=str(DEV_BRANCH), 
metadata_quorum=quorum.isolated_kraft):
         """
-        Setup: 1 node zk + 3 node kafka cluster
+        Setup: 3 node kafka cluster
         Produce 10e6 100 byte messages to a topic with 6 partitions, 
replication-factor 3, and acks=1.
 
         Collect and return aggregate throughput statistics after all messages 
have been acknowledged.
@@ -158,16 +153,16 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
-    @cluster(num_nodes=5)
-    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 
'TLSv1.3'], compression_type=["none", "snappy"])
-    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"])
-    @cluster(num_nodes=6)
-    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], 
compression_type=["none", "snappy"])
+    @cluster(num_nodes=8)
+    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+            compression_type=["none", "snappy"], 
metadata_quorum=quorum.isolated_kraft)
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"], metadata_quorum=quorum.isolated_kraft)
+    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], 
compression_type=["none", "snappy"], metadata_quorum=quorum.isolated_kraft)
     def test_end_to_end_latency(self, compression_type="none", 
security_protocol="PLAINTEXT", tls_version=None,
                                 interbroker_security_protocol=None, 
client_version=str(DEV_BRANCH),
-                                broker_version=str(DEV_BRANCH)):
+                                broker_version=str(DEV_BRANCH), 
metadata_quorum=quorum.isolated_kraft):
         """
-        Setup: 1 node zk + 3 node kafka cluster
+        Setup: 3 node kafka cluster
         Produce (acks = 1) and consume 10e3 messages to a topic with 6 
partitions and replication-factor 3,
         measuring the latency between production and consumption of each 
message.
 
@@ -190,14 +185,15 @@ class Benchmark(Test):
         self.perf.run()
         return latency(self.perf.results[0]['latency_50th_ms'],  
self.perf.results[0]['latency_99th_ms'], 
self.perf.results[0]['latency_999th_ms'])
 
-    @cluster(num_nodes=6)
-    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 
'TLSv1.3'], compression_type=["none", "snappy"])
-    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"])
+    @cluster(num_nodes=8)
+    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+            compression_type=["none", "snappy"], 
metadata_quorum=quorum.isolated_kraft)
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"], metadata_quorum=quorum.isolated_kraft)
     def test_producer_and_consumer(self, compression_type="none", 
security_protocol="PLAINTEXT", tls_version=None,
                                    interbroker_security_protocol=None,
-                                   client_version=str(DEV_BRANCH), 
broker_version=str(DEV_BRANCH)):
+                                   client_version=str(DEV_BRANCH), 
broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft):
         """
-        Setup: 1 node zk + 3 node kafka cluster
+        Setup: 3 node kafka cluster
         Concurrently produce and consume 10e6 messages with a single producer 
and a single consumer,
 
         Return aggregate throughput statistics for both producer and consumer.
@@ -237,12 +233,13 @@ class Benchmark(Test):
         self.logger.info("\n".join(summary))
         return data
 
-    @cluster(num_nodes=6)
-    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 
'TLSv1.3'], compression_type=["none", "snappy"])
-    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"])
+    @cluster(num_nodes=8)
+    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], tls_version=['TLSv1.2', 'TLSv1.3'],
+            compression_type=["none", "snappy"], 
metadata_quorum=quorum.isolated_kraft)
+    @matrix(security_protocol=['PLAINTEXT'], compression_type=["none", 
"snappy"], metadata_quorum=quorum.isolated_kraft)
     def test_consumer_throughput(self, compression_type="none", 
security_protocol="PLAINTEXT", tls_version=None,
                                  interbroker_security_protocol=None, 
num_consumers=1,
-                                 client_version=str(DEV_BRANCH), 
broker_version=str(DEV_BRANCH)):
+                                 client_version=str(DEV_BRANCH), 
broker_version=str(DEV_BRANCH), metadata_quorum=quorum.isolated_kraft):
         """
         Consume 10e6 100-byte messages with 1 or more consumers from a topic 
with 6 partitions
         and report throughput.

Reply via email to