This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 403ee54cc2c KAFKA-18277 Convert network_degrade_test to Kraft mode 
(#18247)
403ee54cc2c is described below

commit 403ee54cc2c85392e70e32cb3a829f7dcfc22da9
Author: TaiJuWu <[email protected]>
AuthorDate: Fri Jan 10 01:49:13 2025 +0800

    KAFKA-18277 Convert network_degrade_test to Kraft mode (#18247)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 tests/kafkatest/tests/core/network_degrade_test.py | 49 +++++++++++-----------
 1 file changed, 25 insertions(+), 24 deletions(-)

diff --git a/tests/kafkatest/tests/core/network_degrade_test.py 
b/tests/kafkatest/tests/core/network_degrade_test.py
index 68cce856528..1c55d9b7e06 100644
--- a/tests/kafkatest/tests/core/network_degrade_test.py
+++ b/tests/kafkatest/tests/core/network_degrade_test.py
@@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
 
 from kafkatest.services.trogdor.degraded_network_fault_spec import 
DegradedNetworkFaultSpec
 from kafkatest.services.trogdor.trogdor import TrogdorService
-from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService, quorum
 
 
 class NetworkDegradeTest(Test):
@@ -34,36 +34,37 @@ class NetworkDegradeTest(Test):
 
     def __init__(self, test_context):
         super(NetworkDegradeTest, self).__init__(test_context)
-        self.zk = ZookeeperService(test_context, num_nodes=3)
-        self.trogdor = TrogdorService(context=self.test_context, 
client_services=[self.zk])
+        self.kafka = KafkaService(test_context, num_nodes=2, zk=None, 
controller_num_nodes_override=2)
+        self.trogdor = TrogdorService(context=self.test_context, 
client_services=[self.kafka.controller_quorum])
 
     def setUp(self):
-        self.zk.start()
+        self.kafka.start()
         self.trogdor.start()
 
     def teardown(self):
         self.trogdor.stop()
-        self.zk.stop()
+        self.kafka.stop()
 
-    @cluster(num_nodes=5)
-    @parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, 
rate_limit_kbit=0)
-    @parametrize(task_name="latency-100-rate-1000", device_name="eth0", 
latency_ms=50, rate_limit_kbit=1000)
-    def test_latency(self, task_name, device_name, latency_ms, 
rate_limit_kbit):
+    @cluster(num_nodes=3)
+    @parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, 
rate_limit_kbit=0, metadata_quorum=quorum.combined_kraft)
+    @parametrize(task_name="latency-100-rate-1000", device_name="eth0", 
latency_ms=50, rate_limit_kbit=1000, metadata_quorum=quorum.combined_kraft)
+    def test_latency(self, task_name, device_name, latency_ms, 
rate_limit_kbit, metadata_quorum=quorum.combined_kraft):
         spec = DegradedNetworkFaultSpec(0, 10000)
-        for node in self.zk.nodes:
+        for node in self.kafka.controller_quorum.nodes:
             spec.add_node_spec(node.name, device_name, latency_ms, 
rate_limit_kbit)
 
         latency = self.trogdor.create_task(task_name, spec)
 
-        zk0 = self.zk.nodes[0]
-        zk1 = self.zk.nodes[1]
+        quorum0 = self.kafka.controller_quorum.nodes[0]
+        quorum1 = self.kafka.controller_quorum.nodes[1]
+        
 
         # Capture the ping times from the ping stdout
         # 64 bytes from ducker01 (172.24.0.2): icmp_seq=1 ttl=64 time=0.325 ms
         r = re.compile(r".*time=(?P<time>[\d.]+)\sms.*")
 
         times = []
-        for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
zk1.account.hostname):
+        for line in quorum0.account.ssh_capture("ping -i 1 -c 20 %s" % 
quorum1.account.hostname):
             self.logger.debug("Ping output: %s" % line)
             m = r.match(line)
             if m is not None and m.group("time"):
@@ -86,15 +87,15 @@ class NetworkDegradeTest(Test):
         assert len(slow_times) > 5, "Expected to see more slow ping times 
(lower than %d)" % low_time_ms
         assert len(fast_times) > 5, "Expected to see more fast ping times 
(higher than %d)" % high_time_ms
 
-    @cluster(num_nodes=5)
-    @parametrize(task_name="rate-1000", device_name="eth0", latency_ms=0, 
rate_limit_kbit=1000000)
-    @parametrize(task_name="rate-1000-latency-50", device_name="eth0", 
latency_ms=50, rate_limit_kbit=1000000)
-    def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit):
-        zk0 = self.zk.nodes[0]
-        zk1 = self.zk.nodes[1]
+    @cluster(num_nodes=3)
+    @parametrize(task_name="rate-1000", device_name="eth0", latency_ms=0, 
rate_limit_kbit=1000000, metadata_quorum=quorum.combined_kraft)
+    @parametrize(task_name="rate-1000-latency-50", device_name="eth0", 
latency_ms=50, rate_limit_kbit=1000000, metadata_quorum=quorum.combined_kraft)
+    def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit, 
metadata_quorum=quorum.combined_kraft):
+        quorum0 = self.kafka.controller_quorum.nodes[0]
+        quorum1 = self.kafka.controller_quorum.nodes[1]
 
         spec = DegradedNetworkFaultSpec(0, 60000)
-        spec.add_node_spec(zk0.name, device_name, latency_ms, rate_limit_kbit)
+        spec.add_node_spec(quorum0.name, device_name, latency_ms, 
rate_limit_kbit)
 
         # start the task and wait
         rate_limit = self.trogdor.create_task(task_name, spec)
@@ -102,8 +103,8 @@ class NetworkDegradeTest(Test):
                    timeout_sec=10,
                    err_msg="%s failed to start within 10 seconds." % 
rate_limit)
 
-        # Run iperf server on zk1, iperf client on zk0
-        iperf_server = zk1.account.ssh_capture("iperf -s")
+        # Run iperf server on quorum1, iperf client on quorum0
+        iperf_server = quorum1.account.ssh_capture("iperf -s")
 
         # Wait until iperf server is listening before starting the client
         for line in iperf_server:
@@ -117,7 +118,7 @@ class NetworkDegradeTest(Test):
         r = re.compile(r"^.*\s(?P<rate>[\d.]+)\sKbits/sec$")
 
         measured_rates = []
-        for line in zk0.account.ssh_capture("iperf -i 1 -t 20 -f k -c %s" % 
zk1.account.hostname):
+        for line in quorum0.account.ssh_capture("iperf -i 1 -t 20 -f k -c %s" 
% quorum1.account.hostname):
             self.logger.info("iperf output %s" % line)
             m = r.match(line)
             if m is not None:
@@ -126,7 +127,7 @@ class NetworkDegradeTest(Test):
                 self.logger.info("Parsed rate of %d kbit/s from iperf" % 
measured_rate)
 
         # kill iperf server and consume the stdout to ensure clean exit
-        zk1.account.kill_process("iperf")
+        quorum1.account.kill_process("iperf")
         for _ in iperf_server:
             continue
 

Reply via email to