This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38aca3a0454 KAFKA-17917: Convert Kafka core system tests to use KRaft
(#17847)
38aca3a0454 is described below
commit 38aca3a045474a9e52ae25bde28d8a013b04f92b
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Nov 21 15:40:49 2024 -0600
KAFKA-17917: Convert Kafka core system tests to use KRaft (#17847)
- Remove some unused Zookeeper code
- Migrate group mode transactions, security rolling upgrade, and throttling
tests to using KRaft
- Add KRaft downgrade tests to kraft_upgrade_test.py
Reviewers: Colin P. McCabe <[email protected]>
---
tests/kafkatest/tests/core/authorizer_test.py | 12 +---
.../core/compatibility_test_new_broker_test.py | 7 +-
tests/kafkatest/tests/core/consume_bench_test.py | 20 ++----
.../tests/core/consumer_group_command_test.py | 15 ++---
.../tests/core/controller_mutation_quota_test.py | 17 +----
.../kafkatest/tests/core/delegation_token_test.py | 11 +---
.../tests/core/fetch_from_follower_test.py | 8 +--
.../kafkatest/tests/core/get_offset_shell_test.py | 10 +--
.../tests/core/group_mode_transactions_test.py | 28 +++-----
tests/kafkatest/tests/core/kraft_upgrade_test.py | 74 ++++++++++++++++++++--
tests/kafkatest/tests/core/log_dir_failure_test.py | 8 +--
tests/kafkatest/tests/core/produce_bench_test.py | 13 ++--
.../tests/core/reassign_partitions_test.py | 13 +---
tests/kafkatest/tests/core/replica_scale_test.py | 12 +---
.../tests/core/replication_replica_failure_test.py | 16 +----
tests/kafkatest/tests/core/replication_test.py | 14 +---
.../kafkatest/tests/core/round_trip_fault_test.py | 25 +++-----
.../tests/core/security_rolling_upgrade_test.py | 28 ++++----
tests/kafkatest/tests/core/security_test.py | 24 ++-----
tests/kafkatest/tests/core/throttling_test.py | 18 ++----
tests/kafkatest/tests/core/transactions_test.py | 25 ++------
tests/kafkatest/tests/end_to_end.py | 1 +
22 files changed, 158 insertions(+), 241 deletions(-)
diff --git a/tests/kafkatest/tests/core/authorizer_test.py
b/tests/kafkatest/tests/core/authorizer_test.py
index 1e7178f5f81..60c0612f356 100644
--- a/tests/kafkatest/tests/core/authorizer_test.py
+++ b/tests/kafkatest/tests/core/authorizer_test.py
@@ -19,7 +19,6 @@ from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.security.kafka_acls import ACLs
class AuthorizerTest(Test):
@@ -47,15 +46,8 @@ class AuthorizerTest(Test):
def test_authorizer(self, metadata_quorum, authorizer_class):
topics = {"test_topic": {"partitions": 1, "replication-factor": 1}}
- if (authorizer_class == KafkaService.KRAFT_ACL_AUTHORIZER):
- self.zk = None
- else:
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
- topics=topics,
controller_num_nodes_override=1,
- allow_zk_with_kraft=True)
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
+ topics=topics,
controller_num_nodes_override=1)
broker_security_protocol = "SSL"
broker_principal = "User:CN=systemtest"
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index ec4a878f032..7a9d87c0968 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -18,7 +18,6 @@ from kafkatest.services.console_consumer import
ConsoleConsumer
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import config_property
from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4,
LATEST_2_5, LATEST_2_6, \
@@ -33,10 +32,6 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
def setUp(self):
self.topic = "test_topic"
- self.zk = ZookeeperService(self.test_context, num_nodes=1) if
quorum.for_test(self.test_context) == quorum.zk else None
-
- if self.zk:
- self.zk.start()
# Producer and consumer
self.producer_throughput = 10000
@@ -67,7 +62,7 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_9)],
consumer_version=[str(LATEST_3_9)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)],
consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
def test_compatibility(self, producer_version, consumer_version,
compression_types, timestamp_type=None, metadata_quorum=quorum.zk):
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
version=DEV_BRANCH, topics={self.topic: {
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None,
version=DEV_BRANCH, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
'configs':
{"min.insync.replicas": 2}}},
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py
b/tests/kafkatest/tests/core/consume_bench_test.py
index c205604f8f6..c84f0dda59e 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -22,15 +22,13 @@ from kafkatest.services.trogdor.produce_bench_workload
import ProduceBenchWorklo
from kafkatest.services.trogdor.consume_bench_workload import
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
class ConsumeBenchTest(Test):
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ConsumeBenchTest, self).__init__(test_context)
- self.zk = ZookeeperService(test_context, num_nodes=3) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
self.producer_workload_service =
ProduceBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service =
ConsumeBenchWorkloadService(test_context, self.kafka)
self.consumer_workload_service_2 =
ConsumeBenchWorkloadService(test_context, self.kafka)
@@ -42,15 +40,11 @@ class ConsumeBenchTest(Test):
def setUp(self):
self.trogdor.start()
- if self.zk:
- self.zk.start()
self.kafka.start()
def teardown(self):
self.trogdor.stop()
self.kafka.stop()
- if self.zk:
- self.zk.stop()
def produce_messages(self, topics, max_messages=10000):
produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
@@ -85,7 +79,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consume_bench(self, topics, metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ def test_consume_bench(self, topics, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Runs a ConsumeBench workload to consume messages
"""
@@ -115,7 +109,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_single_partition(self, metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ def test_single_partition(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Run a ConsumeBench against a single partition
"""
@@ -146,7 +140,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_random_group_topics(self,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_random_group_topics(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from
all topics independently
@@ -178,7 +172,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_two_consumers_specified_group_topics(self,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_two_consumers_specified_group_topics(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Runs two consumers in the same consumer group to read messages from
topics.
Since a consumerGroup is specified, each consumer should dynamically
get assigned a partition from group
@@ -211,7 +205,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_random_group_partitions(self,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_random_group_partitions(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned
a random group
@@ -244,7 +238,7 @@ class ConsumeBenchTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_specified_group_partitions_should_raise(self,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_specified_group_partitions_should_raise(self,
metadata_quorum, use_new_coordinator=False, group_protocol=None):
"""
Runs multiple consumers in the same group to read messages from
specific partitions.
It is an invalid configuration to provide a consumer group and
specific partitions.
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py
b/tests/kafkatest/tests/core/consumer_group_command_test.py
index 2df53e3093a..57af320574b 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -19,7 +19,6 @@ from ducktape.tests.test import Test
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
@@ -40,23 +39,17 @@ class ConsumerGroupCommandTest(Test):
def __init__(self, test_context):
super(ConsumerGroupCommandTest, self).__init__(test_context)
- self.num_zk = 1
self.num_brokers = 1
self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1}
}
- self.zk = ZookeeperService(test_context, self.num_zk) if
quorum.for_test(test_context) == quorum.zk else None
-
- def setUp(self):
- if self.zk:
- self.zk.start()
def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
+ None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol,
topics=self.topics,
- controller_num_nodes_override=self.num_zk)
+ controller_num_nodes_override=self.num_brokers)
self.kafka.start()
def start_consumer(self, group_protocol=None):
@@ -102,7 +95,7 @@ class ConsumerGroupCommandTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
"""
Tests if ConsumerGroupCommand is listing correct consumer groups
:return: None
@@ -121,7 +114,7 @@ class ConsumerGroupCommandTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
"""
Tests if ConsumerGroupCommand is describing a consumer group correctly
:return: None
diff --git a/tests/kafkatest/tests/core/controller_mutation_quota_test.py
b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
index bf8a3b874ed..98f33deab1f 100644
--- a/tests/kafkatest/tests/core/controller_mutation_quota_test.py
+++ b/tests/kafkatest/tests/core/controller_mutation_quota_test.py
@@ -17,15 +17,7 @@ from ducktape.mark.resource import cluster
from ducktape.mark import matrix
from ducktape.tests.test import Test
-from kafkatest.services.trogdor.produce_bench_workload import
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
-from kafkatest.services.trogdor.consume_bench_workload import
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
-from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
-
-import time
-
class ControllerMutationQuotaTest(Test):
"""Tests throttled partition changes via the kafka-topics CLI as follows:
@@ -54,11 +46,10 @@ class ControllerMutationQuotaTest(Test):
def __init__(self, test_context):
super(ControllerMutationQuotaTest,
self).__init__(test_context=test_context)
self.test_context = test_context
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.window_num = 10
self.window_size_seconds = 200 # must be long enough such that all CLI
commands fit into it
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
server_prop_overrides=[
["quota.window.num", "%s" % self.window_num],
["controller.quota.window.size.seconds", "%s" %
self.window_size_seconds]
@@ -66,19 +57,15 @@ class ControllerMutationQuotaTest(Test):
controller_num_nodes_override=1)
def setUp(self):
- if self.zk:
- self.zk.start()
self.kafka.start()
def teardown(self):
# Need to increase the timeout due to partition count
self.kafka.stop()
- if self.zk:
- self.zk.stop()
@cluster(num_nodes=2)
@matrix(metadata_quorum=quorum.all_kraft)
- def test_controller_mutation_quota(self, metadata_quorum=quorum.zk):
+ def test_controller_mutation_quota(self, metadata_quorum):
self.partition_count = 10
mutation_rate = 3 * self.partition_count / (self.window_num *
self.window_size_seconds)
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py
b/tests/kafkatest/tests/core/delegation_token_test.py
index 48555723098..4a1b1e83a9a 100644
--- a/tests/kafkatest/tests/core/delegation_token_test.py
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -18,7 +18,6 @@ from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property, KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.delegation_tokens import DelegationTokens
from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -35,8 +34,7 @@ class DelegationTokenTest(Test):
self.test_context = test_context
self.topic = "topic"
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
zk_chroot="/kafka",
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None,
topics={self.topic: {"partitions": 1,
"replication-factor": 1}},
server_prop_overrides=[
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
@@ -66,11 +64,6 @@ client.id=console-consumer
self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
self.kafka.interbroker_sasl_mechanism = 'GSSAPI'
-
- def setUp(self):
- if self.zk:
- self.zk.start()
-
def tearDown(self):
self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
@@ -114,7 +107,7 @@ client.id=console-consumer
@cluster(num_nodes=5)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_delegation_token_lifecycle(self, metadata_quorum=quorum.zk):
+ def test_delegation_token_lifecycle(self, metadata_quorum):
self.kafka.start()
self.delegation_tokens = DelegationTokens(self.kafka,
self.test_context)
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index a4c810116dd..8db6e6d3110 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -23,7 +23,6 @@ from kafkatest.services.console_consumer import
ConsoleConsumer
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.monitor.jmx import JmxTool
from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
@@ -37,10 +36,9 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
super(FetchFromFollowerTest, self).__init__(test_context=test_context)
self.jmx_tool = JmxTool(test_context, jmx_poll_ms=100)
self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context,
num_nodes=3,
- zk=self.zk,
+ zk=None,
topics={
self.topic: {
"partitions": 1,
@@ -65,8 +63,6 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
return super(FetchFromFollowerTest, self).min_cluster_size() +
self.num_producers * 2 + self.num_consumers * 2
def setUp(self):
- if self.zk:
- self.zk.start()
self.kafka.start()
@cluster(num_nodes=9)
@@ -79,7 +75,7 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ def test_consumer_preferred_read_replica(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
This test starts up brokers with "broker.rack" and
"replica.selector.class" configurations set. The replica
selector is set to the rack-aware implementation. One of the brokers
has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/get_offset_shell_test.py
b/tests/kafkatest/tests/core/get_offset_shell_test.py
index b48185d15d2..c54dc565ff6 100644
--- a/tests/kafkatest/tests/core/get_offset_shell_test.py
+++ b/tests/kafkatest/tests/core/get_offset_shell_test.py
@@ -20,7 +20,6 @@ from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_consumer import ConsoleConsumer
@@ -51,7 +50,6 @@ class GetOffsetShellTest(Test):
"""
def __init__(self, test_context):
super(GetOffsetShellTest, self).__init__(test_context)
- self.num_zk = 1
self.num_brokers = 1
self.messages_received_count = 0
self.topics = {
@@ -64,16 +62,10 @@ class GetOffsetShellTest(Test):
TOPIC_TEST_TOPIC_PARTITIONS2: {'partitions': 2,
'replication-factor': REPLICATION_FACTOR}
}
- self.zk = ZookeeperService(test_context, self.num_zk) if
quorum.for_test(test_context) == quorum.zk else None
-
- def setUp(self):
- if self.zk:
- self.zk.start()
-
def start_kafka(self, security_protocol, interbroker_security_protocol):
self.kafka = KafkaService(
self.test_context, self.num_brokers,
- self.zk, security_protocol=security_protocol,
+ None, security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol,
topics=self.topics)
self.kafka.start()
diff --git a/tests/kafkatest/tests/core/group_mode_transactions_test.py
b/tests/kafkatest/tests/core/group_mode_transactions_test.py
index 1ffab0413c9..2db9c62b46b 100644
--- a/tests/kafkatest/tests/core/group_mode_transactions_test.py
+++ b/tests/kafkatest/tests/core/group_mode_transactions_test.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -61,14 +60,9 @@ class GroupModeTransactionsTest(Test):
self.progress_timeout_sec = 60
self.consumer_group = "grouped-transactions-test-consumer-group"
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context,
num_nodes=self.num_brokers,
- zk=self.zk, controller_num_nodes_override=1)
-
- def setUp(self):
- if self.zk:
- self.zk.start()
+ zk=None, controller_num_nodes_override=1)
def seed_messages(self, topic, num_seed_messages):
seed_timeout_sec = 10000
@@ -98,16 +92,11 @@ class GroupModeTransactionsTest(Test):
else:
self.kafka.stop_node(node, clean_shutdown = False)
gracePeriodSecs = 5
- if self.zk:
- wait_until(lambda: not self.kafka.pids(node) and not
self.kafka.is_registered(node),
- timeout_sec=self.kafka.zk_session_timeout +
gracePeriodSecs,
- err_msg="Failed to see timely deregistration of
hard-killed broker %s" % str(node.account))
- else:
- brokerSessionTimeoutSecs = 18
- wait_until(lambda: not self.kafka.pids(node),
- timeout_sec=brokerSessionTimeoutSecs +
gracePeriodSecs,
- err_msg="Failed to see timely disappearance of
process for hard-killed broker %s" % str(node.account))
- time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
+ brokerSessionTimeoutSecs = 18
+ wait_until(lambda: not self.kafka.pids(node),
+ timeout_sec=brokerSessionTimeoutSecs +
gracePeriodSecs,
+ err_msg="Failed to see timely disappearance of
process for hard-killed broker %s" % str(node.account))
+ time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions()
@@ -271,8 +260,9 @@ class GroupModeTransactionsTest(Test):
@cluster(num_nodes=10)
@matrix(failure_mode=["hard_bounce", "clean_bounce"],
- bounce_target=["brokers", "clients"])
- def test_transactions(self, failure_mode, bounce_target,
metadata_quorum=quorum.zk):
+ bounce_target=["brokers", "clients"],
+ metadata_quorum=quorum.all_non_upgrade)
+ def test_transactions(self, failure_mode, bounce_target, metadata_quorum):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index dc0dc261d1c..604ffa01803 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -53,7 +53,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
wait_until(lambda: len(self.kafka.isr_idx_list(self.topic,
partition)) == self.replication_factor, timeout_sec=60,
backoff_sec=1, err_msg="Replicas did not rejoin the ISR in
a reasonable amount of time")
- def perform_version_change(self, from_kafka_version):
+ def upgrade_to_dev_version(self, from_kafka_version,
update_metadata_version):
self.logger.info("Performing rolling upgrade.")
for node in self.kafka.controller_quorum.nodes:
self.logger.info("Stopping controller node %s" %
node.account.hostname)
@@ -71,8 +71,28 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
self.kafka.start_node(node)
self.wait_until_rejoin()
self.logger.info("Successfully restarted broker node %s" %
node.account.hostname)
- self.logger.info("Changing metadata.version to %s" %
LATEST_STABLE_METADATA_VERSION)
- self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
+ if update_metadata_version:
+ self.logger.info("Changing metadata.version to %s" %
LATEST_STABLE_METADATA_VERSION)
+ self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
+
+ def downgrade_to_version(self, to_kafka_version):
+ self.logger.info("Performing rolling downgrade.")
+ for node in self.kafka.controller_quorum.nodes:
+ self.logger.info("Stopping controller node %s" %
node.account.hostname)
+ self.kafka.controller_quorum.stop_node(node)
+ node.version = KafkaVersion(to_kafka_version)
+ self.logger.info("Restarting controller node %s" %
node.account.hostname)
+ self.kafka.controller_quorum.start_node(node)
+ self.wait_until_rejoin()
+ self.logger.info("Successfully restarted controller node %s" %
node.account.hostname)
+ for node in self.kafka.nodes:
+ self.logger.info("Stopping broker node %s" % node.account.hostname)
+ self.kafka.stop_node(node)
+ node.version = KafkaVersion(to_kafka_version)
+ self.logger.info("Restarting broker node %s" %
node.account.hostname)
+ self.kafka.start_node(node)
+ self.wait_until_rejoin()
+ self.logger.info("Successfully restarted broker node %s" %
node.account.hostname)
def run_upgrade(self, from_kafka_version):
"""Test upgrade of Kafka broker cluster from various versions to the
current version
@@ -102,7 +122,42 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
self.topic, consumer_timeout_ms=30000,
message_validator=is_int,
version=KafkaVersion(from_kafka_version))
- self.run_produce_consume_validate(core_test_action=lambda:
self.perform_version_change(from_kafka_version))
+
+ self.run_produce_consume_validate(core_test_action=lambda:
self.upgrade_to_dev_version(from_kafka_version, True))
+ cluster_id = self.kafka.cluster_id()
+ assert cluster_id is not None
+ assert len(cluster_id) == 22
+ assert self.kafka.check_protocol_errors(self)
+
+ def run_upgrade_downgrade(self, starting_kafka_version):
+ """Test upgrade and downgrade of Kafka broker cluster from various
versions to current version and back
+
+ - Start 3 node broker cluster on version 'starting_kafka_version'.
+ - Perform rolling upgrade but do not update metadata.version.
+ - Start producer and consumer in the background.
+ - Perform rolling downgrade.
+ - Finally, validate that every message acked by the producer was
consumed by the consumer.
+ """
+ fromKafkaVersion = KafkaVersion(starting_kafka_version)
+ self.kafka = KafkaService(self.test_context,
+ num_nodes=3,
+ zk=None,
+ version=fromKafkaVersion,
+ topics={self.topic: {"partitions":
self.partitions,
+ "replication-factor":
self.replication_factor,
+ 'configs':
{"min.insync.replicas": 2}}})
+ self.kafka.start()
+ self.producer = VerifiableProducer(self.test_context,
self.num_producers, self.kafka,
+ self.topic,
throughput=self.producer_throughput,
+ message_validator=is_int,
+ compression_types=["none"],
+
version=KafkaVersion(starting_kafka_version))
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
+ self.topic, consumer_timeout_ms=30000,
+ message_validator=is_int,
version=KafkaVersion(starting_kafka_version))
+ self.upgrade_to_dev_version(starting_kafka_version, False)
+
+ self.run_produce_consume_validate(core_test_action=lambda:
self.downgrade_to_version(starting_kafka_version))
cluster_id = self.kafka.cluster_id()
assert cluster_id is not None
assert len(cluster_id) == 22
@@ -120,3 +175,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum,
use_new_coordinator=False):
self.run_upgrade(from_kafka_version)
+ @cluster(num_nodes=5)
+ @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4),
str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8),
str(LATEST_3_9), str(DEV_BRANCH)],
+ metadata_quorum=[combined_kraft])
+ def test_combined_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum, use_new_coordinator=False):
+ self.run_upgrade_downgrade(from_kafka_version)
+
+ @cluster(num_nodes=8)
+ @matrix(from_kafka_version=[str(LATEST_3_3), str(LATEST_3_4),
str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8),
str(LATEST_3_9), str(DEV_BRANCH)],
+ metadata_quorum=[isolated_kraft])
+ def test_isolated_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum, use_new_coordinator=False):
+ self.run_upgrade_downgrade(from_kafka_version)
\ No newline at end of file
diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py
b/tests/kafkatest/tests/core/log_dir_failure_test.py
index ba8390ceb1c..4bc453c517b 100644
--- a/tests/kafkatest/tests/core/log_dir_failure_test.py
+++ b/tests/kafkatest/tests/core/log_dir_failure_test.py
@@ -17,7 +17,6 @@ from ducktape.utils.util import wait_until
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from kafkatest.services.kafka import config_property
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
@@ -62,10 +61,9 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
self.topic1 = "test_topic_1"
self.topic2 = "test_topic_2"
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context,
num_nodes=3,
- zk=self.zk,
+ zk=None,
topics={
self.topic1: {"partitions": 1,
"replication-factor": 3, "configs": {"min.insync.replicas": 1}},
self.topic2: {"partitions": 1,
"replication-factor": 3, "configs": {"min.insync.replicas": 2}}
@@ -83,10 +81,6 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
self.num_producers = 1
self.num_consumers = 1
- def setUp(self):
- if self.zk:
- self.zk.start()
-
def min_cluster_size(self):
"""Override this since we're adding services outside of the
constructor"""
return super(LogDirFailureTest, self).min_cluster_size() +
self.num_producers * 2 + self.num_consumers * 2
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py
b/tests/kafkatest/tests/core/produce_bench_test.py
index 5ac08d7b3f3..23a0304193c 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -21,14 +21,12 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.trogdor.produce_bench_workload import
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
class ProduceBenchTest(Test):
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ProduceBenchTest, self).__init__(test_context)
- self.zk = ZookeeperService(test_context, num_nodes=3) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+ self.kafka = KafkaService(test_context, num_nodes=3, zk=None)
self.workload_service = ProduceBenchWorkloadService(test_context,
self.kafka)
self.trogdor = TrogdorService(context=self.test_context,
client_services=[self.kafka,
self.workload_service])
@@ -37,19 +35,15 @@ class ProduceBenchTest(Test):
def setUp(self):
self.trogdor.start()
- if self.zk:
- self.zk.start()
self.kafka.start()
def teardown(self):
self.trogdor.stop()
self.kafka.stop()
- if self.zk:
- self.zk.stop()
@cluster(num_nodes=8)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_produce_bench(self, metadata_quorum=quorum.zk):
+ def test_produce_bench(self, metadata_quorum):
spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.producer_node,
self.workload_service.bootstrap_servers,
@@ -66,7 +60,8 @@ class ProduceBenchTest(Test):
self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True,
indent=2))
@cluster(num_nodes=8)
- def test_produce_bench_transactions(self, metadata_quorum=quorum.zk):
+ @matrix(metadata_quorum=quorum.all_non_upgrade)
+ def test_produce_bench_transactions(self, metadata_quorum):
spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
self.workload_service.producer_node,
self.workload_service.bootstrap_servers,
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 697088dfa9c..dc7907630be 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -18,7 +18,6 @@ from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum, consumer_group,
TopicPartition
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
@@ -35,19 +34,17 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
"""
def __init__(self, test_context):
- self.num_zk = 1
""":type test_context: ducktape.tests.test.TestContext"""
super(ReassignPartitionsTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.num_partitions = 20
- self.zk = ZookeeperService(test_context, self.num_zk) if
quorum.for_test(test_context) == quorum.zk else None
# We set the min.insync.replicas to match the replication factor
because
# it makes the test more stringent. If min.isr = 2 and
# replication.factor=3, then the test would tolerate the failure of
# reassignment for upto one replica per partition, which is not
# desirable for this test in particular.
- self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk,
+ self.kafka = KafkaService(test_context, num_nodes=4, zk=None,
server_prop_overrides=[
[config_property.LOG_ROLL_TIME_MS,
"5000"],
[config_property.LOG_RETENTION_CHECK_INTERVAL_MS, "5000"]
@@ -59,16 +56,12 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
"min.insync.replicas": 3,
}}
},
- controller_num_nodes_override=self.num_zk)
+ controller_num_nodes_override=1)
self.timeout_sec = 60
self.producer_throughput = 1000
self.num_producers = 1
self.num_consumers = 1
- def setUp(self):
- if self.zk:
- self.zk.start()
-
def min_cluster_size(self):
# Override this since we're adding services outside of the constructor
return super(ReassignPartitionsTest, self).min_cluster_size() +
self.num_producers + self.num_consumers
@@ -159,7 +152,7 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
)
def test_reassign_partitions(self, bounce_brokers,
reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
"""Reassign partitions tests.
- Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20,
replication-factor=3,
+ Setup: 1 controller, 4 kafka nodes, 1 topic with partitions=20,
replication-factor=3,
and min.insync.replicas=3
- Produce messages in the background
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py
b/tests/kafkatest/tests/core/replica_scale_test.py
index d3ddf87bbd8..cd7646a53d7 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -22,7 +22,6 @@ from kafkatest.services.trogdor.consume_bench_workload import
ConsumeBenchWorklo
from kafkatest.services.trogdor.task_spec import TaskSpec
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
import time
@@ -31,12 +30,9 @@ class ReplicaScaleTest(Test):
def __init__(self, test_context):
super(ReplicaScaleTest, self).__init__(test_context=test_context)
self.test_context = test_context
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(self.test_context, num_nodes=8, zk=self.zk,
controller_num_nodes_override=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=8, zk=None,
controller_num_nodes_override=1)
def setUp(self):
- if self.zk:
- self.zk.start()
self.kafka.start()
def teardown(self):
@@ -44,8 +40,6 @@ class ReplicaScaleTest(Test):
for node in self.kafka.nodes:
self.kafka.stop_node(node, clean_shutdown=False, timeout_sec=60)
self.kafka.stop()
- if self.zk:
- self.zk.stop()
@cluster(num_nodes=12)
@matrix(
@@ -64,7 +58,7 @@ class ReplicaScaleTest(Test):
group_protocol=consumer_group.all_group_protocols
)
def test_produce_consume(self, topic_count, partition_count,
replication_factor,
- metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ metadata_quorum, use_new_coordinator=False,
group_protocol=None):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
@@ -127,7 +121,7 @@ class ReplicaScaleTest(Test):
use_new_coordinator=[True, False]
)
def test_clean_bounce(self, topic_count, partition_count,
replication_factor,
- metadata_quorum=quorum.zk,
use_new_coordinator=False):
+ metadata_quorum, use_new_coordinator=False):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py
b/tests/kafkatest/tests/core/replication_replica_failure_test.py
index cc048fcfdec..6e644ce5a22 100644
--- a/tests/kafkatest/tests/core/replication_replica_failure_test.py
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -46,13 +46,13 @@ class ReplicationReplicaFailureTest(EndToEndTest):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ def test_replication_with_replica_failure(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
This test verifies that replication shrinks the ISR when a replica is
not fetching anymore.
It also verifies that replication provides simple durability
guarantees by checking that data acked by
brokers is still available for consumption.
- Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with
partitions=1, replication-factor=3, and min.insync.replicas=2
+ Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=1,
replication-factor=3, and min.insync.replicas=2
- Produce messages in the background
- Consume messages in the background
- Partition a follower
@@ -60,10 +60,6 @@ class ReplicationReplicaFailureTest(EndToEndTest):
- Stop producing and finish consuming
- Validate that every acked message was consumed
"""
- self.create_zookeeper_if_necessary()
- if self.zk:
- self.zk.start()
-
self.create_kafka(num_nodes=3,
server_prop_overrides=[["replica.lag.time.max.ms",
"10000"]],
controller_num_nodes_override=1)
@@ -73,13 +69,7 @@ class ReplicationReplicaFailureTest(EndToEndTest):
client_services=[self.kafka])
self.trogdor.start()
- # If ZK is used, the partition leader is put on the controller node
- # to avoid partitioning the controller later on in the test.
- if self.zk:
- controller = self.kafka.controller()
- assignment = [self.kafka.idx(controller)] + [self.kafka.idx(node)
for node in self.kafka.nodes if node != controller]
- else:
- assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
+ assignment = [self.kafka.idx(node) for node in self.kafka.nodes]
self.topic = "test_topic"
self.kafka.create_topic({"topic": self.topic,
diff --git a/tests/kafkatest/tests/core/replication_test.py
b/tests/kafkatest/tests/core/replication_test.py
index f219744aa93..c1e2e6df3ee 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -122,21 +122,16 @@ class ReplicationTest(EndToEndTest):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT"],
- enable_idempotence=[True])
+ enable_idempotence=[True],
+ metadata_quorum=quorum.all_non_upgrade)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT", "SASL_SSL"],
metadata_quorum=quorum.all_non_upgrade)
- @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
- broker_type=["controller"],
- security_protocol=["PLAINTEXT", "SASL_SSL"])
@matrix(failure_mode=["hard_bounce"],
broker_type=["leader"],
security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"],
interbroker_sasl_mechanism=["PLAIN", "GSSAPI"],
metadata_quorum=quorum.all_non_upgrade)
- @parametrize(failure_mode="hard_bounce",
- broker_type="leader",
- security_protocol="SASL_SSL",
client_sasl_mechanism="SCRAM-SHA-256",
interbroker_sasl_mechanism="SCRAM-SHA-512")
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
security_protocol=["PLAINTEXT"], broker_type=["leader"],
compression_type=["gzip"], tls_version=["TLSv1.2", "TLSv1.3"],
metadata_quorum=quorum.all_non_upgrade)
@@ -148,7 +143,7 @@ class ReplicationTest(EndToEndTest):
These tests verify that replication provides simple durability
guarantees by checking that data acked by
brokers is still available for consumption in the face of various
failure scenarios.
- Setup: 1 zk/KRaft controller, 3 kafka nodes, 1 topic with
partitions=3, replication-factor=3, and min.insync.replicas=2
+ Setup: 1 KRaft controller, 3 kafka nodes, 1 topic with partitions=3,
replication-factor=3, and min.insync.replicas=2
- Produce messages in the background
- Consume messages in the background
@@ -159,9 +154,6 @@ class ReplicationTest(EndToEndTest):
if failure_mode == "controller" and metadata_quorum != quorum.zk:
raise Exception("There is no controller broker when using KRaft
metadata quorum")
- self.create_zookeeper_if_necessary()
- if self.zk:
- self.zk.start()
self.create_kafka(num_nodes=3,
security_protocol=security_protocol,
diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py
b/tests/kafkatest/tests/core/round_trip_fault_test.py
index 212c7052400..24dd29f7f31 100644
--- a/tests/kafkatest/tests/core/round_trip_fault_test.py
+++ b/tests/kafkatest/tests/core/round_trip_fault_test.py
@@ -33,12 +33,9 @@ class RoundTripFaultTest(Test):
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(RoundTripFaultTest, self).__init__(test_context)
- self.zk = ZookeeperService(test_context, num_nodes=3) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(test_context, num_nodes=4, zk=self.zk)
+ self.kafka = KafkaService(test_context, num_nodes=4, zk=None)
self.workload_service = RoundTripWorkloadService(test_context,
self.kafka)
- if quorum.for_test(test_context) == quorum.zk:
- trogdor_client_services = [self.zk, self.kafka,
self.workload_service]
- elif quorum.for_test(test_context) == quorum.isolated_kraft:
+ if quorum.for_test(test_context) == quorum.isolated_kraft:
trogdor_client_services = [self.kafka.controller_quorum,
self.kafka, self.workload_service]
else: #co-located case, which we currently don't test but handle here
for completeness in case we do test it
trogdor_client_services = [self.kafka, self.workload_service]
@@ -56,34 +53,28 @@ class RoundTripFaultTest(Test):
active_topics=active_topics)
def setUp(self):
- if self.zk:
- self.zk.start()
self.kafka.start()
self.trogdor.start()
def teardown(self):
self.trogdor.stop()
self.kafka.stop()
- if self.zk:
- self.zk.stop()
def remote_quorum_nodes(self):
- if quorum.for_test(self.test_context) == quorum.zk:
- return self.zk.nodes
- elif quorum.for_test(self.test_context) == quorum.isolated_kraft:
+ if quorum.for_test(self.test_context) == quorum.isolated_kraft:
return self.kafka.controller_quorum.nodes
else: # co-located case, which we currently don't test but handle here
for completeness in case we do test it
return []
@cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_round_trip_workload(self, metadata_quorum=quorum.zk):
+ def test_round_trip_workload(self, metadata_quorum):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
workload1.wait_for_done(timeout_sec=600)
@cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_round_trip_workload_with_broker_partition(self,
metadata_quorum=quorum.zk):
+ def test_round_trip_workload_with_broker_partition(self, metadata_quorum):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
time.sleep(2)
part1 = [self.kafka.nodes[0]]
@@ -97,7 +88,7 @@ class RoundTripFaultTest(Test):
@cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_produce_consume_with_broker_pause(self,
metadata_quorum=quorum.zk):
+ def test_produce_consume_with_broker_pause(self, metadata_quorum):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
time.sleep(2)
stop1_spec = ProcessStopFaultSpec(0, TaskSpec.MAX_DURATION_MS,
[self.kafka.nodes[0]],
@@ -110,7 +101,7 @@ class RoundTripFaultTest(Test):
@cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_produce_consume_with_client_partition(self,
metadata_quorum=quorum.zk):
+ def test_produce_consume_with_client_partition(self, metadata_quorum):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
time.sleep(2)
part1 = [self.workload_service.nodes[0]]
@@ -123,7 +114,7 @@ class RoundTripFaultTest(Test):
@cluster(num_nodes=9)
@matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk):
+ def test_produce_consume_with_latency(self, metadata_quorum):
workload1 = self.trogdor.create_task("workload1", self.round_trip_spec)
time.sleep(2)
spec = DegradedNetworkFaultSpec(0, 60000)
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index a6e7ca53cb9..96b2ba55194 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -13,13 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils import is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from ducktape.mark import matrix
+from ducktape.mark import parametrize
from ducktape.mark.resource import cluster
from kafkatest.services.security.kafka_acls import ACLs
import time
@@ -39,12 +39,11 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.producer_throughput = 100
self.num_producers = 1
self.num_consumers = 1
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
topics={self.topic: {
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None,
topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
- 'configs': {"min.insync.replicas": 2}}})
- self.zk.start()
+ 'configs': {"min.insync.replicas": 2}}},
+ controller_num_nodes_override=1)
def create_producer_and_consumer(self):
self.producer = VerifiableProducer(
@@ -69,7 +68,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
def add_sasl_mechanism(self, new_client_sasl_mechanism):
self.kafka.client_sasl_mechanism = new_client_sasl_mechanism
- self.kafka.start_minikdc_if_necessary()
self.bounce()
def add_separate_broker_listener(self, broker_security_protocol,
broker_sasl_mechanism):
@@ -99,10 +97,10 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.bounce()
@cluster(num_nodes=8)
- @matrix(client_protocol=[SecurityConfig.SSL])
+ @matrix(client_protocol=[SecurityConfig.SSL],
metadata_quorum=[quorum.isolated_kraft])
@cluster(num_nodes=9)
- @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT,
SecurityConfig.SASL_SSL])
- def test_rolling_upgrade_phase_one(self, client_protocol):
+ @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT,
SecurityConfig.SASL_SSL], metadata_quorum=[quorum.isolated_kraft])
+ def test_rolling_upgrade_phase_one(self, client_protocol, metadata_quorum):
"""
Start with a PLAINTEXT cluster, open a SECURED port, via a rolling
upgrade, ensuring we could produce
and consume throughout over PLAINTEXT. Finally check we can produce
and consume the new secured port.
@@ -123,8 +121,8 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=9)
- @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
- def test_rolling_upgrade_sasl_mechanism_phase_one(self,
new_client_sasl_mechanism):
+ @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN],
metadata_quorum=[quorum.isolated_kraft])
+ def test_rolling_upgrade_sasl_mechanism_phase_one(self,
new_client_sasl_mechanism, metadata_quorum):
"""
Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a
rolling upgrade, ensuring we could produce
and consume throughout over SASL/GSSAPI. Finally check we can produce
and consume using new mechanism.
@@ -147,7 +145,8 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.run_produce_consume_validate(lambda: time.sleep(1))
@cluster(num_nodes=9)
- def test_enable_separate_interbroker_listener(self):
+ @parametrize(metadata_quorum=quorum.isolated_kraft)
+ def test_enable_separate_interbroker_listener(self, metadata_quorum):
"""
Start with a cluster that has a single PLAINTEXT listener.
Start producing/consuming on PLAINTEXT port.
@@ -164,7 +163,8 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
SecurityConfig.SASL_MECHANISM_PLAIN)
@cluster(num_nodes=9)
- def test_disable_separate_interbroker_listener(self):
+ @parametrize(metadata_quorum=quorum.isolated_kraft)
+ def test_disable_separate_interbroker_listener(self, metadata_quorum):
"""
Start with a cluster that has two listeners, one on SSL (clients),
another on SASL_SSL (broker-to-broker).
Start producer and consumer on SSL listener.
diff --git a/tests/kafkatest/tests/core/security_test.py
b/tests/kafkatest/tests/core/security_test.py
index 33f3c72d751..a92b1ad6b3e 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -84,7 +84,7 @@ class SecurityTest(EndToEndTest):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum=quorum.zk,
use_new_coordinator=False, group_protocol=None):
+ def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
"""
Test that invalid hostname in certificate results in connection
failures.
When security_protocol=SSL, client SSL handshakes are expected to fail
due to hostname verification failure.
@@ -99,10 +99,6 @@ class SecurityTest(EndToEndTest):
SecurityConfig.ssl_stores =
TestSslStores(self.test_context.local_scratch_dir,
valid_hostname=True)
- self.create_zookeeper_if_necessary()
- if self.zk:
- self.zk.start()
-
self.create_kafka(security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol)
if self.kafka.quorum_info.using_kraft and
interbroker_security_protocol == 'SSL':
@@ -165,26 +161,17 @@ class SecurityTest(EndToEndTest):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_quorum_ssl_endpoint_validation_failure(self,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+ def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
"""
Test that invalid hostname in ZooKeeper or KRaft Controller results in
broker inability to start.
"""
- # Start ZooKeeper/KRaft Controller with valid hostnames in the certs'
SANs
+ # Start KRaft Controller with valid hostnames in the certs' SANs
# so that we can start Kafka
SecurityConfig.ssl_stores =
TestSslStores(self.test_context.local_scratch_dir,
valid_hostname=True)
- self.create_zookeeper_if_necessary(num_nodes=1,
- zk_client_port = False,
- zk_client_secure_port = True,
- zk_tls_encrypt_only = True,
- )
- if self.zk:
- self.zk.start()
-
self.create_kafka(num_nodes=1,
interbroker_security_protocol='SSL', # also sets the
broker-to-kraft-controller security protocol for the KRaft case
- zk_client_secure=True, # ignored if we aren't using
ZooKeeper
)
self.kafka.start()
@@ -194,10 +181,7 @@ class SecurityTest(EndToEndTest):
self.kafka.stop_node(self.kafka.nodes[0])
SecurityConfig.ssl_stores.valid_hostname = False
- if quorum.for_test(self.test_context) == quorum.zk:
- self.kafka.zk.restart_cluster()
- else:
- self.kafka.isolated_controller_quorum.restart_cluster()
+ self.kafka.isolated_controller_quorum.restart_cluster()
try:
self.kafka.start_node(self.kafka.nodes[0], timeout_sec=30)
diff --git a/tests/kafkatest/tests/core/throttling_test.py
b/tests/kafkatest/tests/core/throttling_test.py
index 1a0c1c6ec09..e53dd46dbb6 100644
--- a/tests/kafkatest/tests/core/throttling_test.py
+++ b/tests/kafkatest/tests/core/throttling_test.py
@@ -20,8 +20,7 @@ from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.performance import ProducerPerformanceService
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -46,7 +45,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
super(ThrottlingTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
- self.zk = ZookeeperService(test_context, num_nodes=1)
# Because we are starting the producer/consumer/validate cycle _after_
# seeding the cluster with big data (to test throttling), we need to
# Start the consumer from the end of the stream. further, we need to
@@ -58,7 +56,7 @@ class ThrottlingTest(ProduceConsumeValidateTest):
self.num_partitions = 3
self.kafka = KafkaService(test_context,
num_nodes=self.num_brokers,
- zk=self.zk,
+ zk=None,
topics={
self.topic: {
"partitions": self.num_partitions,
@@ -67,7 +65,8 @@ class ThrottlingTest(ProduceConsumeValidateTest):
"segment.bytes": 64 * 1024 * 1024
}
}
- })
+ },
+ controller_num_nodes_override=1)
self.producer_throughput = 1000
self.timeout_sec = 400
self.num_records = 2000
@@ -78,9 +77,6 @@ class ThrottlingTest(ProduceConsumeValidateTest):
self.num_consumers = 1
self.throttle = 4 * 1024 * 1024 # 4 MB/s
- def setUp(self):
- self.zk.start()
-
def min_cluster_size(self):
# Override this since we're adding services outside of the constructor
return super(ThrottlingTest, self).min_cluster_size() +\
@@ -139,9 +135,9 @@ class ThrottlingTest(ProduceConsumeValidateTest):
time_taken))
@cluster(num_nodes=10)
- @parametrize(bounce_brokers=True)
- @parametrize(bounce_brokers=False)
- def test_throttled_reassignment(self, bounce_brokers):
+ @parametrize(bounce_brokers=True, metadata_quorum=quorum.isolated_kraft)
+ @parametrize(bounce_brokers=False, metadata_quorum=quorum.isolated_kraft)
+ def test_throttled_reassignment(self, bounce_brokers, metadata_quorum):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/core/transactions_test.py
b/tests/kafkatest/tests/core/transactions_test.py
index 11cd1c24ae3..5fbd987a97c 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer
@@ -59,16 +58,11 @@ class TransactionsTest(Test):
self.progress_timeout_sec = 60
self.consumer_group = "transactions-test-consumer-group"
- self.zk = ZookeeperService(test_context, num_nodes=1) if
quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context,
num_nodes=self.num_brokers,
- zk=self.zk,
+ zk=None,
controller_num_nodes_override=1)
- def setUp(self):
- if self.zk:
- self.zk.start()
-
def seed_messages(self, topic, num_seed_messages):
seed_timeout_sec = 10000
seed_producer = VerifiableProducer(context=self.test_context,
@@ -96,16 +90,11 @@ class TransactionsTest(Test):
else:
self.kafka.stop_node(node, clean_shutdown = False)
gracePeriodSecs = 5
- if self.zk:
- wait_until(lambda: not self.kafka.pids(node) and not
self.kafka.is_registered(node),
- timeout_sec=self.kafka.zk_session_timeout +
gracePeriodSecs,
- err_msg="Failed to see timely deregistration of
hard-killed broker %s" % str(node.account))
- else:
- brokerSessionTimeoutSecs = 18
- wait_until(lambda: not self.kafka.pids(node),
- timeout_sec=brokerSessionTimeoutSecs +
gracePeriodSecs,
- err_msg="Failed to see timely disappearance of
process for hard-killed broker %s" % str(node.account))
- time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
+ brokerSessionTimeoutSecs = 18
+ wait_until(lambda: not self.kafka.pids(node),
+ timeout_sec=brokerSessionTimeoutSecs +
gracePeriodSecs,
+ err_msg="Failed to see timely disappearance of
process for hard-killed broker %s" % str(node.account))
+ time.sleep(brokerSessionTimeoutSecs + gracePeriodSecs)
self.kafka.start_node(node)
self.kafka.await_no_under_replicated_partitions()
@@ -234,7 +223,7 @@ class TransactionsTest(Test):
use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False,
group_protocol=None):
+ def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
diff --git a/tests/kafkatest/tests/end_to_end.py
b/tests/kafkatest/tests/end_to_end.py
index 3b9cf9f8c3c..ed2ecca87fd 100644
--- a/tests/kafkatest/tests/end_to_end.py
+++ b/tests/kafkatest/tests/end_to_end.py
@@ -36,6 +36,7 @@ class EndToEndTest(Test):
def __init__(self, test_context, topic="test_topic",
topic_config=DEFAULT_TOPIC_CONFIG):
super(EndToEndTest, self).__init__(test_context=test_context)
+ self.zk = None
self.topic = topic
self.topic_config = topic_config
self.records_consumed = []