This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38aca3a0454 KAFKA-17917: Convert Kafka core system tests to use KRaft 
(#17847)
38aca3a0454 is described below

commit 38aca3a045474a9e52ae25bde28d8a013b04f92b
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Nov 21 15:40:49 2024 -0600

    KAFKA-17917: Convert Kafka core system tests to use KRaft (#17847)
    
    - Remove some unused Zookeeper code
    
    - Migrate group mode transactions, security rolling upgrade, and throttling 
tests to using KRaft
    
    - Add KRaft downgrade tests to kraft_upgrade_test.py
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 tests/kafkatest/tests/core/authorizer_test.py      | 12 +---
 .../core/compatibility_test_new_broker_test.py     |  7 +-
 tests/kafkatest/tests/core/consume_bench_test.py   | 20 ++----
 .../tests/core/consumer_group_command_test.py      | 15 ++---
 .../tests/core/controller_mutation_quota_test.py   | 17 +----
 .../kafkatest/tests/core/delegation_token_test.py  | 11 +---
 .../tests/core/fetch_from_follower_test.py         |  8 +--
 .../kafkatest/tests/core/get_offset_shell_test.py  | 10 +--
 .../tests/core/group_mode_transactions_test.py     | 28 +++-----
 tests/kafkatest/tests/core/kraft_upgrade_test.py   | 74 ++++++++++++++++++++--
 tests/kafkatest/tests/core/log_dir_failure_test.py |  8 +--
 tests/kafkatest/tests/core/produce_bench_test.py   | 13 ++--
 .../tests/core/reassign_partitions_test.py         | 13 +---
 tests/kafkatest/tests/core/replica_scale_test.py   | 12 +---
 .../tests/core/replication_replica_failure_test.py | 16 +----
 tests/kafkatest/tests/core/replication_test.py     | 14 +---
 .../kafkatest/tests/core/round_trip_fault_test.py  | 25 +++-----
 .../tests/core/security_rolling_upgrade_test.py    | 28 ++++----
 tests/kafkatest/tests/core/security_test.py        | 24 ++-----
 tests/kafkatest/tests/core/throttling_test.py      | 18 ++----
 tests/kafkatest/tests/core/transactions_test.py    | 25 ++------
 tests/kafkatest/tests/end_to_end.py                |  1 +
 22 files changed, 158 insertions(+), 241 deletions(-)

diff --git a/tests/kafkatest/tests/core/authorizer_test.py 
b/tests/kafkatest/tests/core/authorizer_test.py
index 1e7178f5f81..60c0612f356 100644
--- a/tests/kafkatest/tests/core/authorizer_test.py
+++ b/tests/kafkatest/tests/core/authorizer_test.py
@@ -19,7 +19,6 @@ from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 
 from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.security.kafka_acls import ACLs
 
 class AuthorizerTest(Test):
@@ -47,15 +46,8 @@ class AuthorizerTest(Test):
     def test_authorizer(self, metadata_quorum, authorizer_class):
         topics = {"test_topic": {"partitions": 1, "replication-factor": 1}}
 
-        if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER):
-            self.zk = None
-        else:
-            self.zk = ZookeeperService(self.test_context, num_nodes=1)
-            self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
-                                  topics=topics, 
controller_num_nodes_override=1,
-                                  allow_zk_with_kraft=True)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
+                                  topics=topics, 
controller_num_nodes_override=1)
 
         broker_security_protocol = "SSL"
         broker_principal = "User:CN=systemtest"
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py 
b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index ec4a878f032..7a9d87c0968 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -18,7 +18,6 @@ from kafkatest.services.console_consumer import 
ConsoleConsumer
 from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.kafka import config_property
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, 
LATEST_2_5, LATEST_2_6, \
@@ -33,10 +32,6 @@ class 
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
 
     def setUp(self):
         self.topic = "test_topic"
-        self.zk = ZookeeperService(self.test_context, num_nodes=1) if 
quorum.for_test(self.test_context) == quorum.zk else None
-
-        if self.zk:
-            self.zk.start()
 
         # Producer and consumer
         self.producer_throughput = 10000
@@ -67,7 +62,7 @@ class 
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
     @matrix(producer_version=[str(LATEST_3_9)], 
consumer_version=[str(LATEST_3_9)], compression_types=[["none"]], 
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
     @matrix(producer_version=[str(LATEST_2_1)], 
consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], 
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
     def test_compatibility(self, producer_version, consumer_version, 
compression_types, timestamp_type=None, metadata_quorum=quorum.zk):
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
version=DEV_BRANCH, topics={self.topic: {
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, 
version=DEV_BRANCH, topics={self.topic: {
                                                                     
"partitions": 3,
                                                                     
"replication-factor": 3,
                                                                     'configs': 
{"min.insync.replicas": 2}}},
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py 
b/tests/kafkatest/tests/core/consume_bench_test.py
index c205604f8f6..c84f0dda59e 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -22,15 +22,13 @@ from kafkatest.services.trogdor.produce_bench_workload 
import ProduceBenchWorklo
 from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
 from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
 
 
 class ConsumeBenchTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ConsumeBenchTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
         self.producer_workload_service = 
ProduceBenchWorkloadService(test_context, self.kafka)
         self.consumer_workload_service = 
ConsumeBenchWorkloadService(test_context, self.kafka)
         self.consumer_workload_service_2 = 
ConsumeBenchWorkloadService(test_context, self.kafka)
@@ -42,15 +40,11 @@ class ConsumeBenchTest(Test):
 
     def setUp(self):
         self.trogdor.start()
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        if self.zk:
-            self.zk.stop()
 
     def produce_messages(self, topics, max_messages=10000):
         produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
@@ -85,7 +79,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_consume_bench(self, topics, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_consume_bench(self, topics, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Runs a ConsumeBench workload to consume messages
         """
@@ -115,7 +109,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_single_partition(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_single_partition(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Run a ConsumeBench against a single partition
         """
@@ -146,7 +140,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_multiple_consumers_random_group_topics(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_multiple_consumers_random_group_topics(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Runs multiple consumers group to read messages from topics.
         Since a consumerGroup isn't specified, each consumer should read from 
all topics independently
@@ -178,7 +172,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_two_consumers_specified_group_topics(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_two_consumers_specified_group_topics(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Runs two consumers in the same consumer group to read messages from 
topics.
         Since a consumerGroup is specified, each consumer should dynamically 
get assigned a partition from group
@@ -211,7 +205,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_multiple_consumers_random_group_partitions(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_multiple_consumers_random_group_partitions(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Runs multiple consumers in to read messages from specific partitions.
         Since a consumerGroup isn't specified, each consumer will get assigned 
a random group
@@ -244,7 +238,7 @@ class ConsumeBenchTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_multiple_consumers_specified_group_partitions_should_raise(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_multiple_consumers_specified_group_partitions_should_raise(self, 
metadata_quorum, use_new_coordinator=False, group_protocol=None):
         """
         Runs multiple consumers in the same group to read messages from 
specific partitions.
         It is an invalid configuration to provide a consumer group and 
specific partitions.
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py 
b/tests/kafkatest/tests/core/consumer_group_command_test.py
index 2df53e3093a..57af320574b 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -19,7 +19,6 @@ from ducktape.tests.test import Test
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum, consumer_group
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
@@ -40,23 +39,17 @@ class ConsumerGroupCommandTest(Test):
 
     def __init__(self, test_context):
         super(ConsumerGroupCommandTest, self).__init__(test_context)
-        self.num_zk = 1
         self.num_brokers = 1
         self.topics = {
             TOPIC: {'partitions': 1, 'replication-factor': 1}
         }
-        self.zk = ZookeeperService(test_context, self.num_zk) if 
quorum.for_test(test_context) == quorum.zk else None
-
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
 
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
+            None, security_protocol=security_protocol,
             interbroker_security_protocol=interbroker_security_protocol, 
topics=self.topics,
-            controller_num_nodes_override=self.num_zk)
+            controller_num_nodes_override=self.num_brokers)
         self.kafka.start()
 
     def start_consumer(self, group_protocol=None):
@@ -102,7 +95,7 @@ class ConsumerGroupCommandTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_list_consumer_groups(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Tests if ConsumerGroupCommand is listing correct consumer groups
         :return: None
@@ -121,7 +114,7 @@ class ConsumerGroupCommandTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_describe_consumer_group(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Tests if ConsumerGroupCommand is describing a consumer group correctly
         :return: None
diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py 
b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
index bf8a3b874ed..98f33deab1f 100644
--- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py
+++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
@@ -17,15 +17,7 @@ from ducktape.mark.resource import cluster
 from ducktape.mark import matrix
 from ducktape.tests.test import Test
 
-from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
-from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
-from kafkatest.services.trogdor.task_spec import TaskSpec
 from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
-
-import time
-
 
 class ControllerMutationQuotaTest(Test):
     """Tests throttled partition changes via the kafka-topics CLI as follows:
@@ -54,11 +46,10 @@ class ControllerMutationQuotaTest(Test):
     def __init__(self, test_context):
         super(ControllerMutationQuotaTest, 
self).__init__(test_context=test_context)
         self.test_context = test_context
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
         self.window_num = 10
         self.window_size_seconds = 200 # must be long enough such that all CLI 
commands fit into it
 
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
             server_prop_overrides=[
                 ["quota.window.num", "%s" % self.window_num],
                 ["controller.quota.window.size.seconds", "%s" % 
self.window_size_seconds]
@@ -66,19 +57,15 @@ class ControllerMutationQuotaTest(Test):
             controller_num_nodes_override=1)
 
     def setUp(self):
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
         # Need to increase the timeout due to partition count
         self.kafka.stop()
-        if self.zk:
-            self.zk.stop()
 
     @cluster(num_nodes=2)
     @matrix(metadata_quorum=quorum.all_kraft)
-    def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
+    def test_controller_mutation_quota(self, metadata_quorum):
         self.partition_count = 10
         mutation_rate = 3 * self.partition_count / (self.window_num * 
self.window_size_seconds)
 
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py 
b/tests/kafkatest/tests/core/delegation_token_test.py
index 48555723098..4a1b1e83a9a 100644
--- a/tests/kafkatest/tests/core/delegation_token_test.py
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -18,7 +18,6 @@ from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import config_property, KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.delegation_tokens import DelegationTokens
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -35,8 +34,7 @@ class DelegationTokenTest(Test):
 
         self.test_context = test_context
         self.topic = "topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
zk_chroot="/kafka",
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
                                   topics={self.topic: {"partitions": 1, 
"replication-factor": 1}},
                                   server_prop_overrides=[
                                       
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
@@ -66,11 +64,6 @@ client.id=console-consumer
         self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
         self.kafka.interbroker_sasl_mechanism = 'GSSAPI'
 
-
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
-
     def tearDown(self):
         self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
         self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
@@ -114,7 +107,7 @@ client.id=console-consumer
 
     @cluster(num_nodes=5)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_delegation_token_lifecycle(self, metadata_quorum=quorum.zk):
+    def test_delegation_token_lifecycle(self, metadata_quorum):
         self.kafka.start()
         self.delegation_tokens = DelegationTokens(self.kafka, 
self.test_context)
 
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py 
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index a4c810116dd..8db6e6d3110 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -23,7 +23,6 @@ from kafkatest.services.console_consumer import 
ConsoleConsumer
 from kafkatest.services.kafka import KafkaService, quorum, consumer_group
 from kafkatest.services.monitor.jmx import JmxTool
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
 
@@ -37,10 +36,9 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         super(FetchFromFollowerTest, self).__init__(test_context=test_context)
         self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100)
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=3,
-                                  zk=self.zk,
+                                  zk=None,
                                   topics={
                                       self.topic: {
                                           "partitions": 1,
@@ -65,8 +63,6 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         return super(FetchFromFollowerTest, self).min_cluster_size() + 
self.num_producers * 2 + self.num_consumers * 2
 
     def setUp(self):
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
     @cluster(num_nodes=9)
@@ -79,7 +75,7 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_consumer_preferred_read_replica(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         This test starts up brokers with "broker.rack" and 
"replica.selector.class" configurations set. The replica
         selector is set to the rack-aware implementation. One of the brokers 
has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py 
b/tests/kafkatest/tests/core/get_offset_shell_test.py
index b48185d15d2..c54dc565ff6 100644
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -20,7 +20,6 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 
@@ -51,7 +50,6 @@ class GetOffsetShellTest(Test):
     """
     def __init__(self, test_context):
         super(GetOffsetShellTest, self).__init__(test_context)
-        self.num_zk = 1
         self.num_brokers = 1
         self.messages_received_count = 0
         self.topics = {
@@ -64,16 +62,10 @@ class GetOffsetShellTest(Test):
             TOPIC_TEST_TOPIC_PARTITIONS2: {'partitions': 2, 
'replication-factor': REPLICATION_FACTOR}
         }
 
-        self.zk = ZookeeperService(test_context, self.num_zk) if 
quorum.for_test(test_context) == quorum.zk else None
-
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
-
     def start_kafka(self, security_protocol, interbroker_security_protocol):
         self.kafka = KafkaService(
             self.test_context, self.num_brokers,
-            self.zk, security_protocol=security_protocol,
+            None, security_protocol=security_protocol,
             interbroker_security_protocol=interbroker_security_protocol, 
topics=self.topics)
         self.kafka.start()
 
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py 
b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index 1ffab0413c9..2db9c62b46b 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -61,14 +60,9 @@ class GroupModeTransactionsTest(Test):
         self.progress_timeout_sec = 60
         self.consumer_group = "grouped-transactions-test-consumer-group"
 
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk, controller_num_nodes_override=1)
-
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
+                                  zk=None, controller_num_nodes_override=1)
 
     def seed_messages(self, topic, num_seed_messages):
         seed_timeout_sec = 10000
@@ -98,16 +92,11 @@ class GroupModeTransactionsTest(Test):
             else:
                 self.kafka.stop_node(node, clean_shutdown = False)
                 gracePeriodSecs = 5
-                if self.zk:
-                    wait_until(lambda: not self.kafka.pids(node) and not 
self.kafka.is_registered(node),
-                               timeout_sec=self.kafka.zk_session_timeout + 
gracePeriodSecs,
-                               err_msg="Failed to see timely deregistration of 
hard-killed broker %s" % str(node.account))
-                else:
-                    brokerSessionTimeoutSecs = 18
-                    wait_until(lambda: not self.kafka.pids(node),
-                               timeout_sec=brokerSessionTimeoutSecs + 
gracePeriodSecs,
-                               err_msg="Failed to see timely disappearance of 
process for hard-killed broker %s" % str(node.account))
-                    time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
+                brokerSessionTimeoutSecs = 18
+                wait_until(lambda: not self.kafka.pids(node),
+                           timeout_sec=brokerSessionTimeoutSecs + 
gracePeriodSecs,
+                           err_msg="Failed to see timely disappearance of 
process for hard-killed broker %s" % str(node.account))
+                time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
             self.kafka.await_no_under_replicated_partitions()
@@ -271,8 +260,9 @@ class GroupModeTransactionsTest(Test):
 
     @cluster(num_nodes=10)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"])
-    def test_transactions(self, failure_mode, bounce_target, 
metadata_quorum=quorum.zk):
+            bounce_target=["brokers", "clients"],
+            metadata_quorum=quorum.all_non_upgrade)
+    def test_transactions(self, failure_mode, bounce_target, metadata_quorum):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py 
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index dc0dc261d1c..604ffa01803 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -53,7 +53,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
             wait_until(lambda: len(self.kafka.isr_idx_list(self.topic, 
partition)) == self.replication_factor, timeout_sec=60,
                     backoff_sec=1, err_msg="Replicas did not rejoin the ISR in 
a reasonable amount of time")
 
-    def perform_version_change(self, from_kafka_version):
+    def upgrade_to_dev_version(self, from_kafka_version, 
update_metadata_version):
         self.logger.info("Performing rolling upgrade.")
         for node in self.kafka.controller_quorum.nodes:
             self.logger.info("Stopping controller node %s" % 
node.account.hostname)
@@ -71,8 +71,28 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
             self.kafka.start_node(node)
             self.wait_until_rejoin()
             self.logger.info("Successfully restarted broker node %s" % 
node.account.hostname)
-        self.logger.info("Changing metadata.version to %s" % 
LATEST_STABLE_METADATA_VERSION)
-        self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
+        if update_metadata_version:
+            self.logger.info("Changing metadata.version to %s" % 
LATEST_STABLE_METADATA_VERSION)
+            self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
+
+    def downgrade_to_version(self, to_kafka_version):
+        self.logger.info("Performing rolling downgrade.")
+        for node in self.kafka.controller_quorum.nodes:
+            self.logger.info("Stopping controller node %s" % 
node.account.hostname)
+            self.kafka.controller_quorum.stop_node(node)
+            node.version = KafkaVersion(to_kafka_version)
+            self.logger.info("Restarting controller node %s" % 
node.account.hostname)
+            self.kafka.controller_quorum.start_node(node)
+            self.wait_until_rejoin()
+            self.logger.info("Successfully restarted controller node %s" % 
node.account.hostname)
+        for node in self.kafka.nodes:
+            self.logger.info("Stopping broker node %s" % node.account.hostname)
+            self.kafka.stop_node(node)
+            node.version = KafkaVersion(to_kafka_version)
+            self.logger.info("Restarting broker node %s" % 
node.account.hostname)
+            self.kafka.start_node(node)
+            self.wait_until_rejoin()
+            self.logger.info("Successfully restarted broker node %s" % 
node.account.hostname)
 
     def run_upgrade(self, from_kafka_version):
         """Test upgrade of Kafka broker cluster from various versions to the 
current version
@@ -102,7 +122,42 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
                                         self.topic, consumer_timeout_ms=30000,
                                         message_validator=is_int, 
version=KafkaVersion(from_kafka_version))
-        self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_version_change(from_kafka_version))
+
+        self.run_produce_consume_validate(core_test_action=lambda: 
self.upgrade_to_dev_version(from_kafka_version, True))
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.check_protocol_errors(self)
+
+    def run_upgrade_downgrade(self, starting_kafka_version):
+        """Test upgrade and downgrade of Kafka broker cluster from various 
versions to current version and back
+
+        - Start 3 node broker cluster on version 'starting_kafka_version'.
+        - Perform rolling upgrade but do not update metadata.version.
+        - Start producer and consumer in the background.
+        - Perform rolling downgrade.
+        - Finally, validate that every message acked by the producer was 
consumed by the consumer.
+        """
+        fromKafkaVersion = KafkaVersion(starting_kafka_version)
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=None,
+                                  version=fromKafkaVersion,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}})
+        self.kafka.start()
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           self.topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int,
+                                           compression_types=["none"],
+                                           
version=KafkaVersion(starting_kafka_version))
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        self.topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int, 
version=KafkaVersion(starting_kafka_version))
+        self.upgrade_to_dev_version(starting_kafka_version, False)
+
+        self.run_produce_consume_validate(core_test_action=lambda: 
self.downgrade_to_version(starting_kafka_version))
         cluster_id = self.kafka.cluster_id()
         assert cluster_id is not None
         assert len(cluster_id) == 22
@@ -120,3 +175,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
     def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False):
         self.run_upgrade(from_kafka_version)
 
+    @cluster(num_nodes=5)
+    @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4), 
str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), 
str(LATEST_3_9), str(DEV_BRANCH)],
+            metadata_quorum=[combined_kraft])
+    def test_combined_mode_upgrade_downgrade(self, from_kafka_version, 
metadata_quorum, use_new_coordinator=False):
+        self.run_upgrade_downgrade(from_kafka_version)
+
+    @cluster(num_nodes=8)
+    @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4), 
str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), 
str(LATEST_3_9), str(DEV_BRANCH)],
+            metadata_quorum=[isolated_kraft])
+    def test_isolated_mode_upgrade_downgrade(self, from_kafka_version, 
metadata_quorum, use_new_coordinator=False):
+        self.run_upgrade_downgrade(from_kafka_version)
\ No newline at end of file
diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py 
b/tests/kafkatest/tests/core/log_dir_failure_test.py
index ba8390ceb1c..4bc453c517b 100644
--- a/tests/kafkatest/tests/core/log_dir_failure_test.py
+++ b/tests/kafkatest/tests/core/log_dir_failure_test.py
@@ -17,7 +17,6 @@ from ducktape.utils.util import wait_until
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.services.kafka import config_property
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
@@ -62,10 +61,9 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
 
         self.topic1 = "test_topic_1"
         self.topic2 = "test_topic_2"
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=3,
-                                  zk=self.zk,
+                                  zk=None,
                                   topics={
                                       self.topic1: {"partitions": 1, 
"replication-factor": 3, "configs": {"min.insync.replicas": 1}},
                                       self.topic2: {"partitions": 1, 
"replication-factor": 3, "configs": {"min.insync.replicas": 2}}
@@ -83,10 +81,6 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
         self.num_producers = 1
         self.num_consumers = 1
 
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
-
     def min_cluster_size(self):
         """Override this since we're adding services outside of the 
constructor"""
         return super(LogDirFailureTest, self).min_cluster_size() + 
self.num_producers * 2 + self.num_consumers * 2
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py 
b/tests/kafkatest/tests/core/produce_bench_test.py
index 5ac08d7b3f3..23a0304193c 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -21,14 +21,12 @@ from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
 from kafkatest.services.trogdor.task_spec import TaskSpec
 from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
 
 class ProduceBenchTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ProduceBenchTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
         self.workload_service = ProduceBenchWorkloadService(test_context, 
self.kafka)
         self.trogdor = TrogdorService(context=self.test_context,
                                       client_services=[self.kafka, 
self.workload_service])
@@ -37,19 +35,15 @@ class ProduceBenchTest(Test):
 
     def setUp(self):
         self.trogdor.start()
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        if self.zk:
-            self.zk.stop()
 
     @cluster(num_nodes=8)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_produce_bench(self, metadata_quorum=quorum.zk):
+    def test_produce_bench(self, metadata_quorum):
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         
self.workload_service.bootstrap_servers,
@@ -66,7 +60,8 @@ class ProduceBenchTest(Test):
         self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
 
     @cluster(num_nodes=8)
-    def test_produce_bench_transactions(self, metadata_quorum=quorum.zk):
+    @matrix(metadata_quorum=quorum.all_non_upgrade)
+    def test_produce_bench_transactions(self, metadata_quorum):
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         
self.workload_service.bootstrap_servers,
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py 
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 697088dfa9c..dc7907630be 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -18,7 +18,6 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.kafka import config_property
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum, consumer_group, 
TopicPartition
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
@@ -35,19 +34,17 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
     """
 
     def __init__(self, test_context):
-        self.num_zk = 1
         """:type test_context: ducktape.tests.test.TestContext"""
         super(ReassignPartitionsTest, self).__init__(test_context=test_context)
 
         self.topic = "test_topic"
         self.num_partitions = 20
-        self.zk = ZookeeperService(test_context, self.num_zk) if 
quorum.for_test(test_context) == quorum.zk else None
         # We set the min.insync.replicas to match the replication factor 
because
         # it makes the test more stringent. If min.isr = 2 and
         # replication.factor=3, then the test would tolerate the failure of
         # reassignment for upto one replica per partition, which is not
         # desirable for this test in particular.
-        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk,
+        self.kafka = KafkaService(test_context, num_nodes=4, zk=None,
                                   server_prop_overrides=[
                                       [config_property.LOG_ROLL_TIME_MS, 
"5000"],
                                       
[config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"]
@@ -59,16 +56,12 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
                                           "min.insync.replicas": 3,
                                       }}
                                   },
-                                  controller_num_nodes_override=self.num_zk)
+                                  controller_num_nodes_override=1)
         self.timeout_sec = 60
         self.producer_throughput = 1000
         self.num_producers = 1
         self.num_consumers = 1
 
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
-
     def min_cluster_size(self):
         # Override this since we're adding services outside of the constructor
         return super(ReassignPartitionsTest, self).min_cluster_size() + 
self.num_producers + self.num_consumers
@@ -159,7 +152,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
     )
     def test_reassign_partitions(self, bounce_brokers, 
reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         """Reassign partitions tests.
-        Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, 
replication-factor=3,
+        Setup: 1 controller, 4 kafka nodes, 1 topic with partitions=20, 
replication-factor=3,
         and min.insync.replicas=3
 
             - Produce messages in the background
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py 
b/tests/kafkatest/tests/core/replica_scale_test.py
index d3ddf87bbd8..cd7646a53d7 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -22,7 +22,6 @@ from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorklo
 from kafkatest.services.trogdor.task_spec import TaskSpec
 from kafkatest.services.kafka import KafkaService, quorum, consumer_group
 from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
 
 import time
 
@@ -31,12 +30,9 @@ class ReplicaScaleTest(Test):
     def __init__(self, test_context):
         super(ReplicaScaleTest, self).__init__(test_context=test_context)
         self.test_context = test_context
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk, 
controller_num_nodes_override=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=8, zk=None, 
controller_num_nodes_override=1)
 
     def setUp(self):
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
 
     def teardown(self):
@@ -44,8 +40,6 @@ class ReplicaScaleTest(Test):
         for node in self.kafka.nodes:
             self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60)
         self.kafka.stop()
-        if self.zk:
-            self.zk.stop()
 
     @cluster(num_nodes=12)
     @matrix(
@@ -64,7 +58,7 @@ class ReplicaScaleTest(Test):
         group_protocol=consumer_group.all_group_protocols
     )
     def test_produce_consume(self, topic_count, partition_count, 
replication_factor, 
-                             metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+                             metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "replicas_produce_consume_%d" % i
@@ -127,7 +121,7 @@ class ReplicaScaleTest(Test):
         use_new_coordinator=[True, False]
     )
     def test_clean_bounce(self, topic_count, partition_count, 
replication_factor,
-                          metadata_quorum=quorum.zk, 
use_new_coordinator=False):
+                          metadata_quorum, use_new_coordinator=False):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py 
b/tests/kafkatest/tests/core/replication_replica_failure_test.py
index cc048fcfdec..6e644ce5a22 100644
--- a/tests/kafkatest/tests/core/replication_replica_failure_test.py
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -46,13 +46,13 @@ class ReplicationReplicaFailureTest(EndToEndTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_replication_with_replica_failure(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         This test verifies that replication shrinks the ISR when a replica is 
not fetching anymore.
         It also verifies that replication provides simple durability 
guarantees by checking that data acked by
         brokers is still available for consumption.
 
-        Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with 
partitions=1, replication-factor=3, and min.insync.replicas=2
+        Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=1, 
replication-factor=3, and min.insync.replicas=2
           - Produce messages in the background
           - Consume messages in the background
           - Partition a follower
@@ -60,10 +60,6 @@ class ReplicationReplicaFailureTest(EndToEndTest):
           - Stop producing and finish consuming
           - Validate that every acked message was consumed
         """
-        self.create_zookeeper_if_necessary()
-        if self.zk:
-            self.zk.start()
-
         self.create_kafka(num_nodes=3,
                           server_prop_overrides=[["replica.lag.time.max.ms", 
"10000"]],
                           controller_num_nodes_override=1)
@@ -73,13 +69,7 @@ class ReplicationReplicaFailureTest(EndToEndTest):
                                       client_services=[self.kafka])
         self.trogdor.start()
 
-        # If ZK is used, the partition leader is put on the controller node
-        # to avoid partitioning the controller later on in the test.
-        if self.zk:
-            controller = self.kafka.controller()
-            assignment = [self.kafka.idx(controller)] + [self.kafka.idx(node) 
for node in self.kafka.nodes if node != controller]
-        else:
-            assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
+        assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
 
         self.topic = "test_topic"
         self.kafka.create_topic({"topic": self.topic,
diff --git a/tests/kafkatest/tests/core/replication_test.py 
b/tests/kafkatest/tests/core/replication_test.py
index f219744aa93..c1e2e6df3ee 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -122,21 +122,16 @@ class ReplicationTest(EndToEndTest):
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["leader"],
             security_protocol=["PLAINTEXT"],
-            enable_idempotence=[True])
+            enable_idempotence=[True],
+            metadata_quorum=quorum.all_non_upgrade)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             broker_type=["leader"],
             security_protocol=["PLAINTEXT", "SASL_SSL"],
             metadata_quorum=quorum.all_non_upgrade)
-    @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
-            broker_type=["controller"],
-            security_protocol=["PLAINTEXT", "SASL_SSL"])
     @matrix(failure_mode=["hard_bounce"],
             broker_type=["leader"],
             security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], 
interbroker_sasl_mechanism=["PLAIN", "GSSAPI"],
             metadata_quorum=quorum.all_non_upgrade)
-    @parametrize(failure_mode="hard_bounce",
-            broker_type="leader",
-            security_protocol="SASL_SSL", 
client_sasl_mechanism="SCRAM-SHA-256", 
interbroker_sasl_mechanism="SCRAM-SHA-512")
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             security_protocol=["PLAINTEXT"], broker_type=["leader"], 
compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"],
             metadata_quorum=quorum.all_non_upgrade)
@@ -148,7 +143,7 @@ class ReplicationTest(EndToEndTest):
         These tests verify that replication provides simple durability 
guarantees by checking that data acked by
         brokers is still available for consumption in the face of various 
failure scenarios.
 
-        Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with 
partitions=3, replication-factor=3, and min.insync.replicas=2
+        Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=3, 
replication-factor=3, and min.insync.replicas=2
 
             - Produce messages in the background
             - Consume messages in the background
@@ -159,9 +154,6 @@ class ReplicationTest(EndToEndTest):
 
         if failure_mode == "controller" and metadata_quorum != quorum.zk:
             raise Exception("There is no controller broker when using KRaft 
metadata quorum")
-        self.create_zookeeper_if_necessary()
-        if self.zk:
-            self.zk.start()
 
         self.create_kafka(num_nodes=3,
                           security_protocol=security_protocol,
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py 
b/tests/kafkatest/tests/core/round_trip_fault_test.py
index 212c7052400..24dd29f7f31 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -33,12 +33,9 @@ class RoundTripFaultTest(Test):
     def __init__(self, test_context):
         """:type test_context: ducktape.tests.test.TestContext"""
         super(RoundTripFaultTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3) if 
quorum.for_test(test_context) == quorum.zk else None
-        self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk)
+        self.kafka = KafkaService(test_context, num_nodes=4, zk=None)
         self.workload_service = RoundTripWorkloadService(test_context, 
self.kafka)
-        if quorum.for_test(test_context) == quorum.zk:
-            trogdor_client_services = [self.zk, self.kafka, 
self.workload_service]
-        elif quorum.for_test(test_context) == quorum.isolated_kraft:
+        if quorum.for_test(test_context) == quorum.isolated_kraft:
             trogdor_client_services = [self.kafka.controller_quorum, 
self.kafka, self.workload_service]
         else: #co-located case, which we currently don't test but handle here 
for completeness in case we do test it
             trogdor_client_services = [self.kafka, self.workload_service]
@@ -56,34 +53,28 @@ class RoundTripFaultTest(Test):
                                      active_topics=active_topics)
 
     def setUp(self):
-        if self.zk:
-            self.zk.start()
         self.kafka.start()
         self.trogdor.start()
 
     def teardown(self):
         self.trogdor.stop()
         self.kafka.stop()
-        if self.zk:
-            self.zk.stop()
 
     def remote_quorum_nodes(self):
-        if quorum.for_test(self.test_context) == quorum.zk:
-            return self.zk.nodes
-        elif quorum.for_test(self.test_context) == quorum.isolated_kraft:
+        if quorum.for_test(self.test_context) == quorum.isolated_kraft:
             return self.kafka.controller_quorum.nodes
         else: # co-located case, which we currently don't test but handle here 
for completeness in case we do test it
             return []
 
     @cluster(num_nodes=9)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_round_trip_workload(self, metadata_quorum=quorum.zk):
+    def test_round_trip_workload(self, metadata_quorum):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         workload1.wait_for_done(timeout_sec=600)
 
     @cluster(num_nodes=9)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_round_trip_workload_with_broker_partition(self, 
metadata_quorum=quorum.zk):
+    def test_round_trip_workload_with_broker_partition(self, metadata_quorum):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         part1 = [self.kafka.nodes[0]]
@@ -97,7 +88,7 @@ class RoundTripFaultTest(Test):
 
     @cluster(num_nodes=9)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_produce_consume_with_broker_pause(self, 
metadata_quorum=quorum.zk):
+    def test_produce_consume_with_broker_pause(self, metadata_quorum):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS, 
[self.kafka.nodes[0]],
@@ -110,7 +101,7 @@ class RoundTripFaultTest(Test):
 
     @cluster(num_nodes=9)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_produce_consume_with_client_partition(self, 
metadata_quorum=quorum.zk):
+    def test_produce_consume_with_client_partition(self, metadata_quorum):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         part1 = [self.workload_service.nodes[0]]
@@ -123,7 +114,7 @@ class RoundTripFaultTest(Test):
 
     @cluster(num_nodes=9)
     @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk):
+    def test_produce_consume_with_latency(self, metadata_quorum):
         workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
         time.sleep(2)
         spec = DegradedNetworkFaultSpec(0, 60000)
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py 
b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index a6e7ca53cb9..96b2ba55194 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from ducktape.mark import matrix
+from ducktape.mark import parametrize
 from ducktape.mark.resource import cluster
 from kafkatest.services.security.kafka_acls import ACLs
 import time
@@ -39,12 +39,11 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.producer_throughput = 100
         self.num_producers = 1
         self.num_consumers = 1
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, 
topics={self.topic: {
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, 
topics={self.topic: {
             "partitions": 3,
             "replication-factor": 3,
-            'configs': {"min.insync.replicas": 2}}})
-        self.zk.start()
+            'configs': {"min.insync.replicas": 2}}},
+            controller_num_nodes_override=1)
 
     def create_producer_and_consumer(self):
         self.producer = VerifiableProducer(
@@ -69,7 +68,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
 
     def add_sasl_mechanism(self, new_client_sasl_mechanism):
         self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
-        self.kafka.start_minikdc_if_necessary()
         self.bounce()
 
     def add_separate_broker_listener(self, broker_security_protocol, 
broker_sasl_mechanism):
@@ -99,10 +97,10 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.bounce()
 
     @cluster(num_nodes=8)
-    @matrix(client_protocol=[SecurityConfig.SSL])
+    @matrix(client_protocol=[SecurityConfig.SSL], 
metadata_quorum=[quorum.isolated_kraft])
     @cluster(num_nodes=9)
-    @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, 
SecurityConfig.SASL_SSL])
-    def test_rolling_upgrade_phase_one(self, client_protocol):
+    @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, 
SecurityConfig.SASL_SSL], metadata_quorum=[quorum.isolated_kraft])
+    def test_rolling_upgrade_phase_one(self, client_protocol, metadata_quorum):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling 
upgrade, ensuring we could produce
         and consume throughout over PLAINTEXT. Finally check we can produce 
and consume the new secured port.
@@ -123,8 +121,8 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=9)
-    @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
-    def test_rolling_upgrade_sasl_mechanism_phase_one(self, 
new_client_sasl_mechanism):
+    @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN], 
metadata_quorum=[quorum.isolated_kraft])
+    def test_rolling_upgrade_sasl_mechanism_phase_one(self, 
new_client_sasl_mechanism, metadata_quorum):
         """
         Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a 
rolling upgrade, ensuring we could produce
         and consume throughout over SASL/GSSAPI. Finally check we can produce 
and consume using new mechanism.
@@ -147,7 +145,8 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=9)
-    def test_enable_separate_interbroker_listener(self):
+    @parametrize(metadata_quorum=quorum.isolated_kraft)
+    def test_enable_separate_interbroker_listener(self, metadata_quorum):
         """
         Start with a cluster that has a single PLAINTEXT listener.
         Start producing/consuming on PLAINTEXT port.
@@ -164,7 +163,8 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
                                           SecurityConfig.SASL_MECHANISM_PLAIN)
 
     @cluster(num_nodes=9)
-    def test_disable_separate_interbroker_listener(self):
+    @parametrize(metadata_quorum=quorum.isolated_kraft)
+    def test_disable_separate_interbroker_listener(self, metadata_quorum):
         """
         Start with a cluster that has two listeners, one on SSL (clients), 
another on SASL_SSL (broker-to-broker).
         Start producer and consumer on SSL listener.
diff --git a/tests/kafkatest/tests/core/security_test.py 
b/tests/kafkatest/tests/core/security_test.py
index 33f3c72d751..a92b1ad6b3e 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -84,7 +84,7 @@ class SecurityTest(EndToEndTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_client_ssl_endpoint_validation_failure(self, security_protocol, 
interbroker_security_protocol, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_client_ssl_endpoint_validation_failure(self, security_protocol, 
interbroker_security_protocol, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         """
         Test that invalid hostname in certificate results in connection 
failures.
         When security_protocol=SSL, client SSL handshakes are expected to fail 
due to hostname verification failure.
@@ -99,10 +99,6 @@ class SecurityTest(EndToEndTest):
         SecurityConfig.ssl_stores = 
TestSslStores(self.test_context.local_scratch_dir,
                                                   valid_hostname=True)
 
-        self.create_zookeeper_if_necessary()
-        if self.zk:
-            self.zk.start()
-
         self.create_kafka(security_protocol=security_protocol,
                           
interbroker_security_protocol=interbroker_security_protocol)
         if self.kafka.quorum_info.using_kraft and 
interbroker_security_protocol == 'SSL':
@@ -165,26 +161,17 @@ class SecurityTest(EndToEndTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_quorum_ssl_endpoint_validation_failure(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
         """
         Test that invalid hostname in ZooKeeper or KRaft Controller results in 
broker inability to start.
         """
-        # Start ZooKeeper/KRaft Controller with valid hostnames in the certs' 
SANs
+        # Start KRaft Controller with valid hostnames in the certs' SANs
         # so that we can start Kafka
         SecurityConfig.ssl_stores = 
TestSslStores(self.test_context.local_scratch_dir,
                                                   valid_hostname=True)
 
-        self.create_zookeeper_if_necessary(num_nodes=1,
-                                           zk_client_port = False,
-                                           zk_client_secure_port = True,
-                                           zk_tls_encrypt_only = True,
-                                           )
-        if self.zk:
-            self.zk.start()
-
         self.create_kafka(num_nodes=1,
                           interbroker_security_protocol='SSL', # also sets the 
broker-to-kraft-controller security protocol for the KRaft case
-                          zk_client_secure=True, # ignored if we aren't using 
ZooKeeper
                           )
         self.kafka.start()
 
@@ -194,10 +181,7 @@ class SecurityTest(EndToEndTest):
         self.kafka.stop_node(self.kafka.nodes[0])
 
         SecurityConfig.ssl_stores.valid_hostname = False
-        if quorum.for_test(self.test_context) == quorum.zk:
-            self.kafka.zk.restart_cluster()
-        else:
-            self.kafka.isolated_controller_quorum.restart_cluster()
+        self.kafka.isolated_controller_quorum.restart_cluster()
 
         try:
             self.kafka.start_node(self.kafka.nodes[0], timeout_sec=30)
diff --git a/tests/kafkatest/tests/core/throttling_test.py 
b/tests/kafkatest/tests/core/throttling_test.py
index 1a0c1c6ec09..e53dd46dbb6 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -20,8 +20,7 @@ from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 
 from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -46,7 +45,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
         super(ThrottlingTest, self).__init__(test_context=test_context)
 
         self.topic = "test_topic"
-        self.zk = ZookeeperService(test_context, num_nodes=1)
         # Because we are starting the producer/consumer/validate cycle _after_
         # seeding the cluster with big data (to test throttling), we need to
         # Start the consumer from the end of the stream. further, we need to
@@ -58,7 +56,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
         self.num_partitions = 3
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk,
+                                  zk=None,
                                   topics={
                                       self.topic: {
                                           "partitions": self.num_partitions,
@@ -67,7 +65,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                                               "segment.bytes": 64 * 1024 * 1024
                                           }
                                       }
-                                  })
+                                  },
+                                  controller_num_nodes_override=1)
         self.producer_throughput = 1000
         self.timeout_sec = 400
         self.num_records = 2000
@@ -78,9 +77,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
         self.num_consumers = 1
         self.throttle = 4 * 1024 * 1024  # 4 MB/s
 
-    def setUp(self):
-        self.zk.start()
-
     def min_cluster_size(self):
         # Override this since we're adding services outside of the constructor
         return super(ThrottlingTest, self).min_cluster_size() +\
@@ -139,9 +135,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
                 time_taken))
 
     @cluster(num_nodes=10)
-    @parametrize(bounce_brokers=True)
-    @parametrize(bounce_brokers=False)
-    def test_throttled_reassignment(self, bounce_brokers):
+    @parametrize(bounce_brokers=True, metadata_quorum=quorum.isolated_kraft)
+    @parametrize(bounce_brokers=False, metadata_quorum=quorum.isolated_kraft)
+    def test_throttled_reassignment(self, bounce_brokers, metadata_quorum):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/transactions_test.py 
b/tests/kafkatest/tests/core/transactions_test.py
index 11cd1c24ae3..5fbd987a97c 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService, quorum, consumer_group
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -59,16 +58,11 @@ class TransactionsTest(Test):
         self.progress_timeout_sec = 60
         self.consumer_group = "transactions-test-consumer-group"
 
-        self.zk = ZookeeperService(test_context, num_nodes=1) if 
quorum.for_test(test_context) == quorum.zk else None
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk,
+                                  zk=None,
                                   controller_num_nodes_override=1)
 
-    def setUp(self):
-        if self.zk:
-            self.zk.start()
-
     def seed_messages(self, topic, num_seed_messages):
         seed_timeout_sec = 10000
         seed_producer = VerifiableProducer(context=self.test_context,
@@ -96,16 +90,11 @@ class TransactionsTest(Test):
             else:
                 self.kafka.stop_node(node, clean_shutdown = False)
                 gracePeriodSecs = 5
-                if self.zk:
-                    wait_until(lambda: not self.kafka.pids(node) and not 
self.kafka.is_registered(node),
-                               timeout_sec=self.kafka.zk_session_timeout + 
gracePeriodSecs,
-                               err_msg="Failed to see timely deregistration of 
hard-killed broker %s" % str(node.account))
-                else:
-                    brokerSessionTimeoutSecs = 18
-                    wait_until(lambda: not self.kafka.pids(node),
-                               timeout_sec=brokerSessionTimeoutSecs + 
gracePeriodSecs,
-                               err_msg="Failed to see timely disappearance of 
process for hard-killed broker %s" % str(node.account))
-                    time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
+                brokerSessionTimeoutSecs = 18
+                wait_until(lambda: not self.kafka.pids(node),
+                           timeout_sec=brokerSessionTimeoutSecs + 
gracePeriodSecs,
+                           err_msg="Failed to see timely disappearance of 
process for hard-killed broker %s" % str(node.account))
+                time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
                 self.kafka.start_node(node)
 
             self.kafka.await_no_under_replicated_partitions()
@@ -234,7 +223,7 @@ class TransactionsTest(Test):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_transactions(self, failure_mode, bounce_target, check_order, 
use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False, 
group_protocol=None):
+    def test_transactions(self, failure_mode, bounce_target, check_order, 
use_group_metadata, metadata_quorum, use_new_coordinator=False, 
group_protocol=None):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/end_to_end.py 
b/tests/kafkatest/tests/end_to_end.py
index 3b9cf9f8c3c..ed2ecca87fd 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -36,6 +36,7 @@ class EndToEndTest(Test):
 
     def __init__(self, test_context, topic="test_topic", 
topic_config=DEFAULT_TOPIC_CONFIG):
         super(EndToEndTest, self).__init__(test_context=test_context)
+        self.zk = None
         self.topic = topic
         self.topic_config = topic_config
         self.records_consumed = []

Reply via email to