This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5ec9dffa818 KAFKA-17916: removing ZK from connect ducktape tests 
(#17689)
5ec9dffa818 is described below

commit 5ec9dffa8187b1cc68039d0bcf57c8f30694a793
Author: kevin-wu24 <[email protected]>
AuthorDate: Tue Nov 5 16:33:17 2024 -0600

    KAFKA-17916: removing ZK from connect ducktape tests (#17689)
    
    Migrates existing connect tests that were using Zookeeper to use KRaft
    instead, and cleans up some dead ZK code. For broker compatibility tests,
    tests for versions 2.1-2.3 still need to use ZK.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../tests/connect/connect_distributed_test.py      | 44 +++++++++-------------
 .../tests/connect/connect_plugin_discovery_test.py |  6 ++-
 tests/kafkatest/tests/connect/connect_rest_test.py |  2 +-
 tests/kafkatest/tests/connect/connect_test.py      | 21 ++++-------
 4 files changed, 30 insertions(+), 43 deletions(-)

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 3066ac16831..64a80d2483e 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -449,7 +449,8 @@ class ConnectDistributedTest(Test):
                        err_msg="Failed to see connector startup in PAUSED 
state")
 
     @cluster(num_nodes=5)
-    def test_dynamic_logging(self):
+    @parametrize(metadata_quorum=quorum.isolated_kraft)
+    def test_dynamic_logging(self, metadata_quorum):
         """
         Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
         """
@@ -981,32 +982,23 @@ class ConnectDistributedTest(Test):
             assert obj['payload'][ts_fieldname] == ts
 
     @cluster(num_nodes=5)
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned')
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned')
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible')
-    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager')
-    def test_broker_compatibility(self, broker_version, auto_create_topics, 
exactly_once_source, connect_protocol):
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned', 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=True, connect_protocol='sessioned', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='sessioned', 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible', 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='compatible', 
metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager', 
metadata_quorum=quorum.isolated_kraft)
+    @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+    @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, 
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+    def test_broker_compatibility(self, broker_version, auto_create_topics, 
exactly_once_source, connect_protocol, metadata_quorum):
         """
-        Verify that Connect will start up with various broker versions with 
various configurations. 
-        When Connect distributed starts up, it either creates internal topics 
(v0.10.1.0 and after) 
+        Verify that Connect will start up with various broker versions with 
various configurations.
+        When Connect distributed starts up, it either creates internal topics 
(v0.10.1.0 and after)
         or relies upon the broker to auto-create the topics (v0.10.0.x and 
before).
         """
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
diff --git a/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py 
b/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
index 9d349b0afb2..e66f54c16e5 100644
--- a/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
+++ b/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
@@ -18,6 +18,7 @@ from kafkatest.tests.kafka_test import KafkaTest
 from kafkatest.directory_layout.kafka_path import CONNECT_FILE_JAR
 from kafkatest.version import LATEST_3_5
 from kafkatest.services.connect import ConnectStandaloneService, 
ConnectServiceBase
+from kafkatest.services.kafka import quorum
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
@@ -41,8 +42,9 @@ class ConnectPluginDiscoveryTest(KafkaTest):
 
     @cluster(num_nodes=3)
     @matrix(plugin_discovery=['only_scan', 'hybrid_warn', 'hybrid_fail', 
'service_load'],
-            command=[('sync-manifests', '--dry-run'), ('sync-manifests',)])
-    def test_plugin_discovery_migration(self, plugin_discovery, command):
+            command=[('sync-manifests', '--dry-run'), ('sync-manifests',)],
+            metadata_quorum=[quorum.isolated_kraft])
+    def test_plugin_discovery_migration(self, plugin_discovery, command, 
metadata_quorum):
         # Template parameters
         self.PLUGIN_DISCOVERY = plugin_discovery
 
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py 
b/tests/kafkatest/tests/connect/connect_rest_test.py
index da3e5be534b..1bd340f25c7 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -73,7 +73,7 @@ class ConnectRestApiTest(KafkaTest):
     CONNECT_PROTOCOL="compatible"
 
     def __init__(self, test_context):
-        super(ConnectRestApiTest, self).__init__(test_context, num_zk=1, 
num_brokers=1, topics={
+        super(ConnectRestApiTest, self).__init__(test_context, num_zk=0, 
num_brokers=1, topics={
             'test': {'partitions': 1, 'replication-factor': 1}
         })
 
diff --git a/tests/kafkatest/tests/connect/connect_test.py 
b/tests/kafkatest/tests/connect/connect_test.py
index ad850c568b0..692d5a8ee7e 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -57,19 +57,16 @@ class ConnectStandaloneFileTest(Test):
 
     def __init__(self, test_context):
         super(ConnectStandaloneFileTest, self).__init__(test_context)
-        self.num_zk = 1
         self.num_brokers = 1
         self.topics = {
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk) if 
quorum.for_test(test_context) == quorum.zk else None
-
     @cluster(num_nodes=5)
-    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", 
schemas=True)
-    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", 
schemas=False)
-    @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", 
schemas=None)
-    @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", 
schemas=True, metadata_quorum=quorum.isolated_kraft)
+    @parametrize(converter="org.apache.kafka.connect.json.JsonConverter", 
schemas=False, metadata_quorum=quorum.isolated_kraft)
+    @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", 
schemas=None, metadata_quorum=quorum.isolated_kraft)
+    @parametrize(security_protocol=SecurityConfig.PLAINTEXT, 
metadata_quorum=quorum.isolated_kraft)
     @cluster(num_nodes=6)
     @matrix(security_protocol=[SecurityConfig.SASL_SSL], 
metadata_quorum=quorum.all_non_upgrade)
     def test_file_source_and_sink(self, 
converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, 
security_protocol='PLAINTEXT',
@@ -87,9 +84,9 @@ class ConnectStandaloneFileTest(Test):
             self.override_value_converter = converter
         self.schemas = schemas
 
-        self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+        self.kafka = KafkaService(self.test_context, self.num_brokers, None,
                                   security_protocol=security_protocol, 
interbroker_security_protocol=security_protocol,
-                                  topics=self.topics, 
controller_num_nodes_override=self.num_zk)
+                                  topics=self.topics)
 
         self.source = ConnectStandaloneService(self.test_context, self.kafka, 
[self.INPUT_FILE, self.OFFSETS_FILE],
                                                
include_filestream_connectors=True)
@@ -98,8 +95,6 @@ class ConnectStandaloneFileTest(Test):
         self.consumer_validator = ConsoleConsumer(self.test_context, 1, 
self.kafka, self.TOPIC_TEST,
                                                   consumer_timeout_ms=10000)
 
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
         self.source.set_configs(lambda node: 
self.render("connect-standalone.properties", node=node), 
[self.render("connect-file-source.properties")])
@@ -141,7 +136,7 @@ class ConnectStandaloneFileTest(Test):
     @parametrize(error_tolerance=ErrorTolerance.NONE, 
metadata_quorum=quorum.isolated_kraft)
     @parametrize(error_tolerance=ErrorTolerance.ALL, 
metadata_quorum=quorum.isolated_kraft)
     def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
-        self.kafka = KafkaService(self.test_context, self.num_brokers, 
self.zk, topics=self.topics)
+        self.kafka = KafkaService(self.test_context, self.num_brokers, None, 
topics=self.topics)
 
         # set config props
         self.override_error_tolerance_props = error_tolerance
@@ -171,8 +166,6 @@ class ConnectStandaloneFileTest(Test):
         self.sink = ConnectStandaloneService(self.test_context, self.kafka, 
[self.OUTPUT_FILE, self.OFFSETS_FILE],
                                              
include_filestream_connectors=True)
 
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
         self.override_key_converter = 
"org.apache.kafka.connect.storage.StringConverter"

Reply via email to