This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5ec9dffa818 KAFKA-17916: removing ZK from connect ducktape tests
(#17689)
5ec9dffa818 is described below
commit 5ec9dffa8187b1cc68039d0bcf57c8f30694a793
Author: kevin-wu24 <[email protected]>
AuthorDate: Tue Nov 5 16:33:17 2024 -0600
KAFKA-17916: removing ZK from connect ducktape tests (#17689)
Migrates existing connect tests that were using Zookeeper to use KRaft
instead, and cleans up some dead ZK code. For broker compatibility tests,
tests for versions 2.1-2.3 still need to use ZK.
Reviewers: Colin P. McCabe <[email protected]>
---
.../tests/connect/connect_distributed_test.py | 44 +++++++++-------------
.../tests/connect/connect_plugin_discovery_test.py | 6 ++-
tests/kafkatest/tests/connect/connect_rest_test.py | 2 +-
tests/kafkatest/tests/connect/connect_test.py | 21 ++++-------
4 files changed, 30 insertions(+), 43 deletions(-)
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 3066ac16831..64a80d2483e 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -449,7 +449,8 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector startup in PAUSED
state")
@cluster(num_nodes=5)
- def test_dynamic_logging(self):
+ @parametrize(metadata_quorum=quorum.isolated_kraft)
+ def test_dynamic_logging(self, metadata_quorum):
"""
Test out the REST API for dynamically adjusting logging levels, on
both a single-worker and cluster-wide basis.
"""
@@ -981,32 +982,23 @@ class ConnectDistributedTest(Test):
assert obj['payload'][ts_fieldname] == ts
@cluster(num_nodes=5)
- @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned')
- @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='sessioned')
- @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible')
- @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager')
- def test_broker_compatibility(self, broker_version, auto_create_topics,
exactly_once_source, connect_protocol):
+ @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned',
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=True, connect_protocol='sessioned',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='sessioned',
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible',
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='compatible',
metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager',
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+ @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False,
exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
+ def test_broker_compatibility(self, broker_version, auto_create_topics,
exactly_once_source, connect_protocol, metadata_quorum):
"""
- Verify that Connect will start up with various broker versions with
various configurations.
- When Connect distributed starts up, it either creates internal topics
(v0.10.1.0 and after)
+ Verify that Connect will start up with various broker versions with
various configurations.
+ When Connect distributed starts up, it either creates internal topics
(v0.10.1.0 and after)
or relies upon the broker to auto-create the topics (v0.10.0.x and
before).
"""
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source
else 'disabled'
diff --git a/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
b/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
index 9d349b0afb2..e66f54c16e5 100644
--- a/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
+++ b/tests/kafkatest/tests/connect/connect_plugin_discovery_test.py
@@ -18,6 +18,7 @@ from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.directory_layout.kafka_path import CONNECT_FILE_JAR
from kafkatest.version import LATEST_3_5
from kafkatest.services.connect import ConnectStandaloneService,
ConnectServiceBase
+from kafkatest.services.kafka import quorum
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
@@ -41,8 +42,9 @@ class ConnectPluginDiscoveryTest(KafkaTest):
@cluster(num_nodes=3)
@matrix(plugin_discovery=['only_scan', 'hybrid_warn', 'hybrid_fail',
'service_load'],
- command=[('sync-manifests', '--dry-run'), ('sync-manifests',)])
- def test_plugin_discovery_migration(self, plugin_discovery, command):
+ command=[('sync-manifests', '--dry-run'), ('sync-manifests',)],
+ metadata_quorum=[quorum.isolated_kraft])
+ def test_plugin_discovery_migration(self, plugin_discovery, command,
metadata_quorum):
# Template parameters
self.PLUGIN_DISCOVERY = plugin_discovery
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py
b/tests/kafkatest/tests/connect/connect_rest_test.py
index da3e5be534b..1bd340f25c7 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -73,7 +73,7 @@ class ConnectRestApiTest(KafkaTest):
CONNECT_PROTOCOL="compatible"
def __init__(self, test_context):
- super(ConnectRestApiTest, self).__init__(test_context, num_zk=1,
num_brokers=1, topics={
+ super(ConnectRestApiTest, self).__init__(test_context, num_zk=0,
num_brokers=1, topics={
'test': {'partitions': 1, 'replication-factor': 1}
})
diff --git a/tests/kafkatest/tests/connect/connect_test.py
b/tests/kafkatest/tests/connect/connect_test.py
index ad850c568b0..692d5a8ee7e 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -57,19 +57,16 @@ class ConnectStandaloneFileTest(Test):
def __init__(self, test_context):
super(ConnectStandaloneFileTest, self).__init__(test_context)
- self.num_zk = 1
self.num_brokers = 1
self.topics = {
'test' : { 'partitions': 1, 'replication-factor': 1 }
}
- self.zk = ZookeeperService(test_context, self.num_zk) if
quorum.for_test(test_context) == quorum.zk else None
-
@cluster(num_nodes=5)
- @parametrize(converter="org.apache.kafka.connect.json.JsonConverter",
schemas=True)
- @parametrize(converter="org.apache.kafka.connect.json.JsonConverter",
schemas=False)
- @parametrize(converter="org.apache.kafka.connect.storage.StringConverter",
schemas=None)
- @parametrize(security_protocol=SecurityConfig.PLAINTEXT)
+ @parametrize(converter="org.apache.kafka.connect.json.JsonConverter",
schemas=True, metadata_quorum=quorum.isolated_kraft)
+ @parametrize(converter="org.apache.kafka.connect.json.JsonConverter",
schemas=False, metadata_quorum=quorum.isolated_kraft)
+ @parametrize(converter="org.apache.kafka.connect.storage.StringConverter",
schemas=None, metadata_quorum=quorum.isolated_kraft)
+ @parametrize(security_protocol=SecurityConfig.PLAINTEXT,
metadata_quorum=quorum.isolated_kraft)
@cluster(num_nodes=6)
@matrix(security_protocol=[SecurityConfig.SASL_SSL],
metadata_quorum=quorum.all_non_upgrade)
def test_file_source_and_sink(self,
converter="org.apache.kafka.connect.json.JsonConverter", schemas=True,
security_protocol='PLAINTEXT',
@@ -87,9 +84,9 @@ class ConnectStandaloneFileTest(Test):
self.override_value_converter = converter
self.schemas = schemas
- self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
+ self.kafka = KafkaService(self.test_context, self.num_brokers, None,
security_protocol=security_protocol,
interbroker_security_protocol=security_protocol,
- topics=self.topics,
controller_num_nodes_override=self.num_zk)
+ topics=self.topics)
self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE],
include_filestream_connectors=True)
@@ -98,8 +95,6 @@ class ConnectStandaloneFileTest(Test):
self.consumer_validator = ConsoleConsumer(self.test_context, 1,
self.kafka, self.TOPIC_TEST,
consumer_timeout_ms=10000)
- if self.zk:
- self.zk.start()
self.kafka.start()
self.source.set_configs(lambda node:
self.render("connect-standalone.properties", node=node),
[self.render("connect-file-source.properties")])
@@ -141,7 +136,7 @@ class ConnectStandaloneFileTest(Test):
@parametrize(error_tolerance=ErrorTolerance.NONE,
metadata_quorum=quorum.isolated_kraft)
@parametrize(error_tolerance=ErrorTolerance.ALL,
metadata_quorum=quorum.isolated_kraft)
def test_skip_and_log_to_dlq(self, error_tolerance, metadata_quorum):
- self.kafka = KafkaService(self.test_context, self.num_brokers,
self.zk, topics=self.topics)
+ self.kafka = KafkaService(self.test_context, self.num_brokers, None,
topics=self.topics)
# set config props
self.override_error_tolerance_props = error_tolerance
@@ -171,8 +166,6 @@ class ConnectStandaloneFileTest(Test):
self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE],
include_filestream_connectors=True)
- if self.zk:
- self.zk.start()
self.kafka.start()
self.override_key_converter =
"org.apache.kafka.connect.storage.StringConverter"