This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36c131ef4a2 KAFKA-17609:[1/4] Changes needed to convert system tests
to use KRaft and remove ZK (#17275)
36c131ef4a2 is described below
commit 36c131ef4a22c9e917c43c4661005614c4c6f4b7
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 5 11:23:33 2024 -0500
KAFKA-17609:[1/4] Changes needed to convert system tests to use KRaft and
remove ZK (#17275)
This is part one of a multi-pr effort to convert Kafka Streams system tests
to KRaft. I decided to break down the changes into multiple PRs to reduce the
review load
Reviewers: Matthias Sax <[email protected]>
---
tests/kafkatest/services/streams.py | 15 ++--------
tests/kafkatest/tests/streams/base_streams_test.py | 4 +--
.../streams/streams_application_upgrade_test.py | 16 ++++-------
.../tests/streams/streams_broker_bounce_test.py | 32 ++++++++--------------
.../streams/streams_broker_down_resilience_test.py | 8 +++---
5 files changed, 26 insertions(+), 49 deletions(-)
diff --git a/tests/kafkatest/services/streams.py
b/tests/kafkatest/services/streams.py
index 696e9f58f7f..3848fea686d 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -22,7 +22,6 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1
STATE_DIR = "state.dir"
@@ -627,11 +626,6 @@ class
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def start_cmd(self, node):
args = self.args.copy()
-
- if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
- args['zk'] = self.kafka.zk.connect_setting()
- else:
- args['zk'] = ""
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
@@ -642,7 +636,7 @@ class
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
cmd = "( export
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
- " %(kafka_run_class)s %(streams_class_name)s %(zk)s
%(config_file)s " \
+ " %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s"
% args
self.logger.info("Executing: " + cmd)
@@ -732,11 +726,6 @@ class
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
def start_cmd(self, node):
args = self.args.copy()
-
- if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
- args['zk'] = self.kafka.zk.connect_setting()
- else:
- args['zk'] = ""
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
@@ -747,7 +736,7 @@ class
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
cmd = "( export
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
- " %(kafka_run_class)s %(streams_class_name)s %(zk)s
%(config_file)s " \
+ " %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s"
% args
self.logger.info("Executing: " + cmd)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py
b/tests/kafkatest/tests/streams/base_streams_test.py
index a7c7a7b10d7..f0cd4ba89e6 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -27,8 +27,8 @@ class BaseStreamsTest(KafkaTest):
Extends KafkaTest which manages setting up Kafka Cluster and Zookeeper
see tests/kafkatest/tests/kafka_test.py for more info
"""
- def __init__(self, test_context, topics, num_zk=1, num_brokers=3):
- super(BaseStreamsTest, self).__init__(test_context, num_zk,
num_brokers, topics)
+ def __init__(self, test_context, topics, num_controllers=1,
num_brokers=3):
+ super(BaseStreamsTest, self).__init__(test_context, num_controllers,
num_brokers, topics)
def get_consumer(self, client_id, topic, num_messages):
return VerifiableConsumer(self.test_context,
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index f728470d863..0ad84edd509 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -18,9 +18,8 @@ from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService,
StreamsSmokeTestJobRunnerService
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_VERSION, KafkaVersion
@@ -56,9 +55,9 @@ class StreamsUpgradeTest(Test):
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
- @cluster(num_nodes=6)
- @matrix(from_version=smoke_test_versions, bounce_type=["full"])
- def test_app_upgrade(self, from_version, bounce_type):
+ @cluster(num_nodes=9)
+ @matrix(from_version=smoke_test_versions, bounce_type=["full"],
metadata_quorum=[quorum.combined_kraft])
+ def test_app_upgrade(self, from_version, bounce_type, metadata_quorum):
"""
Starts 3 KafkaStreams instances with <old_version>, and upgrades
one-by-one to <new_version>
"""
@@ -68,10 +67,7 @@ class StreamsUpgradeTest(Test):
if from_version == to_version:
return
- self.zk = ZookeeperService(self.test_context, num_nodes=1)
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics={
+ self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None,
topics={
'echo' : { 'partitions': 5, 'replication-factor': 1 },
'data' : { 'partitions': 5, 'replication-factor': 1 },
'min' : { 'partitions': 5, 'replication-factor': 1 },
@@ -86,7 +82,7 @@ class StreamsUpgradeTest(Test):
'avg' : { 'partitions': 5, 'replication-factor': 1 },
'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
'tagg' : { 'partitions': 5, 'replication-factor': 1 }
- })
+ }, controller_num_nodes_override=1)
self.kafka.start()
self.driver = StreamsSmokeTestDriverService(self.test_context,
self.kafka)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index c00b7ac3e14..dc61e5fa37c 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -19,7 +19,6 @@ from ducktape.mark.resource import cluster
from ducktape.mark import matrix
from ducktape.mark import ignore
from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService,
StreamsSmokeTestJobRunnerService
import time
import signal
@@ -152,16 +151,8 @@ class StreamsBrokerBounceTest(Test):
def setup_system(self, start_processor=True, num_threads=3):
# Setup phase
- self.zk = (
- ZookeeperService(self.test_context, 1)
- if quorum.for_test(self.test_context) == quorum.zk
- else None
- )
- if self.zk:
- self.zk.start()
-
- self.kafka = KafkaService(self.test_context,
num_nodes=self.replication, zk=self.zk, topics=self.topics,
- controller_num_nodes_override=1)
+
+ self.kafka = KafkaService(self.test_context,
num_nodes=self.replication, zk=None, topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
@@ -216,7 +207,7 @@ class StreamsBrokerBounceTest(Test):
broker_type=["leader"],
num_threads=[1, 3],
sleep_time_secs=[120],
- metadata_quorum=[quorum.isolated_kraft])
+ metadata_quorum=[quorum.combined_kraft])
def test_broker_type_bounce(self, failure_mode, broker_type,
sleep_time_secs, num_threads, metadata_quorum):
"""
Start a smoke test client, then kill one particular broker and ensure
data is still received
@@ -238,8 +229,9 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown"],
broker_type=["controller"],
- sleep_time_secs=[0])
- def test_broker_type_bounce_at_start(self, failure_mode, broker_type,
sleep_time_secs):
+ sleep_time_secs=[0],
+ metadata_quorum=[quorum.combined_kraft])
+ def test_broker_type_bounce_at_start(self, failure_mode, broker_type,
sleep_time_secs, metadata_quorum):
"""
Start a smoke test client, then kill one particular broker immediately
before streams stats
Streams should throw an exception since it cannot create topics with
the desired
@@ -257,11 +249,11 @@ class StreamsBrokerBounceTest(Test):
return self.collect_results(sleep_time_secs)
- @cluster(num_nodes=7)
+ @cluster(num_nodes=10)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
num_failures=[2],
- metadata_quorum=quorum.all_non_upgrade)
- def test_many_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum=quorum.zk):
+ metadata_quorum=[quorum.isolated_kraft])
+ def test_many_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum):
"""
Start a smoke test client, then kill a few brokers and ensure data is
still received
Record if records are delivered
@@ -276,11 +268,11 @@ class StreamsBrokerBounceTest(Test):
return self.collect_results(120)
- @cluster(num_nodes=7)
+ @cluster(num_nodes=10)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
num_failures=[3],
- metadata_quorum=quorum.all_non_upgrade)
- def test_all_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum=quorum.zk):
+ metadata_quorum=[quorum.isolated_kraft])
+ def test_all_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum):
"""
Start a smoke test client, then kill a few brokers and ensure data is
still received
Record if records are delivered
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 9eeeea3c98e..d0ecd083017 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -45,7 +45,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.zk.start()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
def test_streams_resilient_to_broker_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
@@ -82,7 +82,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
def test_streams_runs_with_broker_down_initially(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
node = self.kafka.leader(self.inputTopic)
@@ -150,7 +150,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
@@ -229,7 +229,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
def test_streams_should_failover_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()