This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25fdcd05fc4 KAFKA-17915: Convert remaining Kafka Client system tests 
to use KRaft (#18367)
25fdcd05fc4 is described below

commit 25fdcd05fc488e49bd1f077d1c464e686d9884e2
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Jan 9 12:20:34 2025 -0600

    KAFKA-17915: Convert remaining Kafka Client system tests to use KRaft 
(#18367)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../client/client_compatibility_features_test.py   | 18 +++++++------
 .../client_compatibility_produce_consume_test.py   | 31 +++++++++++++++-------
 tests/kafkatest/tests/client/quota_test.py         | 18 +++++--------
 3 files changed, 37 insertions(+), 30 deletions(-)

diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py 
b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 1e0c37825d1..dcb7146a298 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -53,7 +53,6 @@ def run_command(node, cmd, ssh_log_file):
             print(e, flush=True)
             raise
 
-
 class ClientCompatibilityFeaturesTest(Test):
     """
     Tests clients for the presence or absence of specific features when 
communicating with brokers with various
@@ -118,17 +117,20 @@ class ClientCompatibilityFeaturesTest(Test):
     @parametrize(broker_version=str(LATEST_3_0))
     @parametrize(broker_version=str(LATEST_3_1))
     @parametrize(broker_version=str(LATEST_3_2))
-    @parametrize(broker_version=str(LATEST_3_3))
-    @parametrize(broker_version=str(LATEST_3_4))
-    @parametrize(broker_version=str(LATEST_3_5))
-    @parametrize(broker_version=str(LATEST_3_6))
-    @parametrize(broker_version=str(LATEST_3_7))
-    @parametrize(broker_version=str(LATEST_3_8))
-    @parametrize(broker_version=str(LATEST_3_9))
+    @parametrize(broker_version=str(LATEST_3_3), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_4), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_5), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_6), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_7), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_8), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_9), 
metadata_quorum=quorum.isolated_kraft)
     def run_compatibility_test(self, broker_version, 
metadata_quorum=quorum.zk):
         if self.zk:
             self.zk.start()
         self.kafka.set_version(KafkaVersion(broker_version))
+        if metadata_quorum == quorum.isolated_kraft:
+            for node in self.kafka.controller_quorum.nodes:
+                node.version = KafkaVersion(broker_version)
         self.kafka.start()
         features = get_broker_features(broker_version)
         self.invoke_compatibility_program(features)
diff --git 
a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py 
b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
index 1209deed64f..4c7bfef158f 100644
--- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
@@ -38,9 +38,17 @@ class 
ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
 
         self.topic = "test_topic"
         self.zk = ZookeeperService(test_context, num_nodes=3) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, 
topics={self.topic:{
-                                                                    
"partitions": 10,
-                                                                    
"replication-factor": 2}})
+        self.kafka = KafkaService(
+            test_context,
+            num_nodes=3,
+            zk=self.zk,
+            topics={
+                self.topic:{
+                    "partitions": 10,
+                    "replication-factor": 2
+                }
+            },
+        )
         self.num_partitions = 10
         self.timeout_sec = 60
         self.producer_throughput = 1000
@@ -69,16 +77,19 @@ class 
ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
     @parametrize(broker_version=str(LATEST_3_0))
     @parametrize(broker_version=str(LATEST_3_1))
     @parametrize(broker_version=str(LATEST_3_2))
-    @parametrize(broker_version=str(LATEST_3_3))
-    @parametrize(broker_version=str(LATEST_3_4))
-    @parametrize(broker_version=str(LATEST_3_5))
-    @parametrize(broker_version=str(LATEST_3_6))
-    @parametrize(broker_version=str(LATEST_3_7))
-    @parametrize(broker_version=str(LATEST_3_8))
-    @parametrize(broker_version=str(LATEST_3_9))
+    @parametrize(broker_version=str(LATEST_3_3), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_4), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_5), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_6), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_7), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_8), 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_3_9), 
metadata_quorum=quorum.isolated_kraft)
     def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
         print("running producer_consumer_compat with broker_version = %s" % 
broker_version, flush=True)
         self.kafka.set_version(KafkaVersion(broker_version))
+        if metadata_quorum == quorum.isolated_kraft:
+            for node in self.kafka.controller_quorum.nodes:
+                node.version = KafkaVersion(broker_version)
         self.kafka.security_protocol = "PLAINTEXT"
         self.kafka.interbroker_security_protocol = self.kafka.security_protocol
         self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
diff --git a/tests/kafkatest/tests/client/quota_test.py 
b/tests/kafkatest/tests/client/quota_test.py
index d52f9b6a944..e89fea80eee 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -17,8 +17,7 @@ from ducktape.tests.test import Test
 from ducktape.mark import matrix, parametrize
 from ducktape.mark.resource import cluster
 
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.performance import ProducerPerformanceService
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.version import DEV_BRANCH
@@ -77,10 +76,9 @@ class QuotaConfig(object):
                 self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA, 
QuotaConfig.LARGE_QUOTA, ['clients', None])
 
     def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, 
entity_args):
-        force_use_zk_connection = not 
kafka.all_nodes_configs_command_uses_bootstrap_server()
         node = kafka.nodes[0]
         cmd = "%s --alter --add-config 
producer_byte_rate=%d,consumer_byte_rate=%d" % \
-              (kafka.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection), producer_byte_rate, consumer_byte_rate)
+              (kafka.kafka_configs_cmd_with_optional_security_settings(node, 
False), producer_byte_rate, consumer_byte_rate)
         cmd += " --entity-type " + entity_args[0] + 
self.entity_name_opt(entity_args[1])
         if len(entity_args) > 2:
             cmd += " --entity-type " + entity_args[2] + 
self.entity_name_opt(entity_args[3])
@@ -108,8 +106,7 @@ class QuotaTest(Test):
         self.num_records = 50000
         self.record_size = 3000
 
-        self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=None,
                                   security_protocol='SSL', 
authorizer_class_name='',
                                   interbroker_security_protocol='SSL',
                                   topics={self.topic: {'partitions': 6, 
'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
@@ -119,17 +116,14 @@ class QuotaTest(Test):
         self.num_producers = 1
         self.num_consumers = 2
 
-    def setUp(self):
-        self.zk.start()
-
     def min_cluster_size(self):
         """Override this since we're adding services outside of the 
constructor"""
         return super(QuotaTest, self).min_cluster_size() + self.num_producers 
+ self.num_consumers
 
     @cluster(num_nodes=5)
-    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, 
QuotaConfig.USER_CLIENT], override_quota=[True, False])
-    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
-    def test_quota(self, quota_type, override_quota=True, producer_num=1, 
consumer_num=1):
+    @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER, 
QuotaConfig.USER_CLIENT], override_quota=[True, False], 
metadata_quorum=[quorum.isolated_kraft])
+    @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2, 
metadata_quorum=quorum.isolated_kraft)
+    def test_quota(self, quota_type, override_quota=True, producer_num=1, 
consumer_num=1, metadata_quorum=quorum.isolated_kraft):
         self.kafka.start()
 
         self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)

Reply via email to