This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36c131ef4a2 KAFKA-17609:[1/4] Changes needed to convert system tests 
to use KRaft and remove ZK  (#17275)
36c131ef4a2 is described below

commit 36c131ef4a22c9e917c43c4661005614c4c6f4b7
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 5 11:23:33 2024 -0500

    KAFKA-17609:[1/4] Changes needed to convert system tests to use KRaft and 
remove ZK  (#17275)
    
    This is part one of a multi-pr effort to convert Kafka Streams system tests 
to KRaft. I decided to break down the changes into multiple PRs to reduce the 
review load
    
    Reviewers: Matthias Sax <[email protected]>
---
 tests/kafkatest/services/streams.py                | 15 ++--------
 tests/kafkatest/tests/streams/base_streams_test.py |  4 +--
 .../streams/streams_application_upgrade_test.py    | 16 ++++-------
 .../tests/streams/streams_broker_bounce_test.py    | 32 ++++++++--------------
 .../streams/streams_broker_down_resilience_test.py |  8 +++---
 5 files changed, 26 insertions(+), 49 deletions(-)

diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 696e9f58f7f..3848fea686d 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -22,7 +22,6 @@ from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import KafkaConfig
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1
 
 STATE_DIR = "state.dir"
 
@@ -627,11 +626,6 @@ class 
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
 
     def start_cmd(self, node):
         args = self.args.copy()
-
-        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or 
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
-            args['zk'] = self.kafka.zk.connect_setting()
-        else:
-            args['zk'] = ""
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -642,7 +636,7 @@ class 
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
 
         cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true 
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
-              " %(kafka_run_class)s %(streams_class_name)s %(zk)s 
%(config_file)s " \
+              " %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" 
% args
 
         self.logger.info("Executing: " + cmd)
@@ -732,11 +726,6 @@ class 
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
 
     def start_cmd(self, node):
         args = self.args.copy()
-
-        if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or 
self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
-            args['zk'] = self.kafka.zk.connect_setting()
-        else:
-            args['zk'] = ""
         args['config_file'] = self.CONFIG_FILE
         args['stdout'] = self.STDOUT_FILE
         args['stderr'] = self.STDERR_FILE
@@ -747,7 +736,7 @@ class 
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
 
         cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true 
UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
-              " %(kafka_run_class)s %(streams_class_name)s %(zk)s 
%(config_file)s " \
+              " %(kafka_run_class)s %(streams_class_name)s %(config_file)s " \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" 
% args
 
         self.logger.info("Executing: " + cmd)
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py 
b/tests/kafkatest/tests/streams/base_streams_test.py
index a7c7a7b10d7..f0cd4ba89e6 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -27,8 +27,8 @@ class BaseStreamsTest(KafkaTest):
     Extends KafkaTest which manages setting up Kafka Cluster and Zookeeper
     see tests/kafkatest/tests/kafka_test.py for more info
     """
-    def __init__(self, test_context,  topics, num_zk=1, num_brokers=3):
-        super(BaseStreamsTest, self).__init__(test_context, num_zk, 
num_brokers, topics)
+    def __init__(self, test_context,  topics, num_controllers=1, 
num_brokers=3):
+        super(BaseStreamsTest, self).__init__(test_context, num_controllers, 
num_brokers, topics)
 
     def get_consumer(self, client_id, topic, num_messages):
         return VerifiableConsumer(self.test_context,
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index f728470d863..0ad84edd509 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -18,9 +18,8 @@ from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
 from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
   LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, 
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_VERSION, KafkaVersion
 
@@ -56,9 +55,9 @@ class StreamsUpgradeTest(Test):
             node.version = KafkaVersion(to_version)
             self.kafka.start_node(node)
 
-    @cluster(num_nodes=6)
-    @matrix(from_version=smoke_test_versions, bounce_type=["full"])
-    def test_app_upgrade(self, from_version, bounce_type):
+    @cluster(num_nodes=9)
+    @matrix(from_version=smoke_test_versions, bounce_type=["full"], 
metadata_quorum=[quorum.combined_kraft])
+    def test_app_upgrade(self, from_version, bounce_type, metadata_quorum):
         """
         Starts 3 KafkaStreams instances with <old_version>, and upgrades 
one-by-one to <new_version>
         """
@@ -68,10 +67,7 @@ class StreamsUpgradeTest(Test):
         if from_version == to_version:
             return
 
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
topics={
+        self.kafka = KafkaService(self.test_context, num_nodes=3, zk=None, 
topics={
             'echo' : { 'partitions': 5, 'replication-factor': 1 },
             'data' : { 'partitions': 5, 'replication-factor': 1 },
             'min' : { 'partitions': 5, 'replication-factor': 1 },
@@ -86,7 +82,7 @@ class StreamsUpgradeTest(Test):
             'avg' : { 'partitions': 5, 'replication-factor': 1 },
             'wcnt' : { 'partitions': 5, 'replication-factor': 1 },
             'tagg' : { 'partitions': 5, 'replication-factor': 1 }
-        })
+        }, controller_num_nodes_override=1)
         self.kafka.start()
 
         self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index c00b7ac3e14..dc61e5fa37c 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -19,7 +19,6 @@ from ducktape.mark.resource import cluster
 from ducktape.mark import matrix
 from ducktape.mark import ignore
 from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
 import time
 import signal
@@ -152,16 +151,8 @@ class StreamsBrokerBounceTest(Test):
         
     def setup_system(self, start_processor=True, num_threads=3):
         # Setup phase
-        self.zk = (
-            ZookeeperService(self.test_context, 1)
-            if quorum.for_test(self.test_context) == quorum.zk
-            else None
-        )
-        if self.zk:
-            self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=self.zk, topics=self.topics,
-                                  controller_num_nodes_override=1)
+
+        self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=None, topics=self.topics)
         self.kafka.start()
 
         # allow some time for topics to be created
@@ -216,7 +207,7 @@ class StreamsBrokerBounceTest(Test):
             broker_type=["leader"],
             num_threads=[1, 3],
             sleep_time_secs=[120],
-            metadata_quorum=[quorum.isolated_kraft])
+            metadata_quorum=[quorum.combined_kraft])
     def test_broker_type_bounce(self, failure_mode, broker_type, 
sleep_time_secs, num_threads, metadata_quorum):
         """
         Start a smoke test client, then kill one particular broker and ensure 
data is still received
@@ -238,8 +229,9 @@ class StreamsBrokerBounceTest(Test):
     @cluster(num_nodes=7)
     @matrix(failure_mode=["clean_shutdown"],
             broker_type=["controller"],
-            sleep_time_secs=[0])
-    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, 
sleep_time_secs):
+            sleep_time_secs=[0],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, 
sleep_time_secs, metadata_quorum):
         """
         Start a smoke test client, then kill one particular broker immediately 
before streams stats
         Streams should throw an exception since it cannot create topics with 
the desired
@@ -257,11 +249,11 @@ class StreamsBrokerBounceTest(Test):
 
         return self.collect_results(sleep_time_secs)
 
-    @cluster(num_nodes=7)
+    @cluster(num_nodes=10)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             num_failures=[2],
-            metadata_quorum=quorum.all_non_upgrade)
-    def test_many_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum=quorum.zk):
+            metadata_quorum=[quorum.isolated_kraft])
+    def test_many_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum):
         """
         Start a smoke test client, then kill a few brokers and ensure data is 
still received
         Record if records are delivered
@@ -276,11 +268,11 @@ class StreamsBrokerBounceTest(Test):
 
         return self.collect_results(120)
 
-    @cluster(num_nodes=7)
+    @cluster(num_nodes=10)
     @matrix(failure_mode=["clean_bounce", "hard_bounce"],
             num_failures=[3],
-            metadata_quorum=quorum.all_non_upgrade)
-    def test_all_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum=quorum.zk):
+            metadata_quorum=[quorum.isolated_kraft])
+    def test_all_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum):
         """
         Start a smoke test client, then kill a few brokers and ensure data is 
still received
         Record if records are delivered
diff --git 
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py 
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 9eeeea3c98e..d0ecd083017 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -45,7 +45,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             self.zk.start()
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    @matrix(metadata_quorum=[quorum.combined_kraft], 
use_new_coordinator=[True, False])
     def test_streams_resilient_to_broker_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 
@@ -82,7 +82,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    @matrix(metadata_quorum=[quorum.combined_kraft], 
use_new_coordinator=[True, False])
     def test_streams_runs_with_broker_down_initially(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
         node = self.kafka.leader(self.inputTopic)
@@ -150,7 +150,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    @matrix(metadata_quorum=[quorum.combined_kraft], 
use_new_coordinator=[True, False])
     def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 
@@ -229,7 +229,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    @matrix(metadata_quorum=[quorum.combined_kraft], 
use_new_coordinator=[True, False])
     def test_streams_should_failover_while_brokers_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 

Reply via email to