This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14ea1cf61a1 KAFKA-19202: Enable KIP-1071 in 
streams_broker_bounce_test.py (#19584)
14ea1cf61a1 is described below

commit 14ea1cf61a10a8310a8a364cec0bbfbe67dbe51f
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Tue Apr 29 18:11:46 2025 +0200

    KAFKA-19202: Enable KIP-1071 in streams_broker_bounce_test.py (#19584)
    
    Enable KIP-1071 in the next system test.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../tests/streams/streams_broker_bounce_test.py    | 36 ++++++++++++----------
 1 file changed, 20 insertions(+), 16 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py 
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index dc61e5fa37c..e9cd1315cf5 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -149,10 +149,10 @@ class StreamsBrokerBounceTest(Test):
         return True
 
         
-    def setup_system(self, start_processor=True, num_threads=3):
+    def setup_system(self, start_processor=True, num_threads=3, 
group_protocol='classic'):
         # Setup phase
-
-        self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=None, topics=self.topics)
+        use_streams_groups = True if group_protocol == 'streams' else False
+        self.kafka = KafkaService(self.test_context, 
num_nodes=self.replication, zk=None, topics=self.topics, 
use_streams_groups=use_streams_groups)
         self.kafka.start()
 
         # allow some time for topics to be created
@@ -162,7 +162,7 @@ class StreamsBrokerBounceTest(Test):
 
         # Start test harness
         self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)
+        self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", group_protocol=group_protocol, num_threads = 
num_threads)
 
         self.driver.start()
 
@@ -207,15 +207,16 @@ class StreamsBrokerBounceTest(Test):
             broker_type=["leader"],
             num_threads=[1, 3],
             sleep_time_secs=[120],
-            metadata_quorum=[quorum.combined_kraft])
-    def test_broker_type_bounce(self, failure_mode, broker_type, 
sleep_time_secs, num_threads, metadata_quorum):
+            metadata_quorum=[quorum.combined_kraft],
+            group_protocol=["classic", "streams"])
+    def test_broker_type_bounce(self, failure_mode, broker_type, 
sleep_time_secs, num_threads, metadata_quorum, group_protocol):
         """
         Start a smoke test client, then kill one particular broker and ensure 
data is still received
         Record if records are delivered.
         We also add a single thread stream client to make sure we could get 
all partitions reassigned in
         next generation so to verify the partition lost is correctly triggered.
         """
-        self.setup_system(num_threads=num_threads)
+        self.setup_system(num_threads=num_threads, 
group_protocol=group_protocol)
 
         # Sleep to allow test to run for a bit
         time.sleep(sleep_time_secs)
@@ -230,14 +231,15 @@ class StreamsBrokerBounceTest(Test):
     @matrix(failure_mode=["clean_shutdown"],
             broker_type=["controller"],
             sleep_time_secs=[0],
-            metadata_quorum=[quorum.combined_kraft])
-    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, 
sleep_time_secs, metadata_quorum):
+            metadata_quorum=[quorum.combined_kraft],
+            group_protocol=["classic", "streams"])
+    def test_broker_type_bounce_at_start(self, failure_mode, broker_type, 
sleep_time_secs, metadata_quorum, group_protocol):
         """
         Start a smoke test client, then kill one particular broker immediately 
before streams stats
         Streams should throw an exception since it cannot create topics with 
the desired
         replication factor of 3
         """
-        self.setup_system(start_processor=False)
+        self.setup_system(start_processor=False, group_protocol=group_protocol)
 
         # Sleep to allow test to run for a bit
         time.sleep(sleep_time_secs)
@@ -252,13 +254,14 @@ class StreamsBrokerBounceTest(Test):
     @cluster(num_nodes=10)
     @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
             num_failures=[2],
-            metadata_quorum=[quorum.isolated_kraft])
-    def test_many_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum):
+            metadata_quorum=[quorum.isolated_kraft],
+            group_protocol=["classic", "streams"])
+    def test_many_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum, group_protocol):
         """
         Start a smoke test client, then kill a few brokers and ensure data is 
still received
         Record if records are delivered
         """
-        self.setup_system() 
+        self.setup_system(group_protocol=group_protocol)
 
         # Sleep to allow test to run for a bit
         time.sleep(120)
@@ -271,8 +274,9 @@ class StreamsBrokerBounceTest(Test):
     @cluster(num_nodes=10)
     @matrix(failure_mode=["clean_bounce", "hard_bounce"],
             num_failures=[3],
-            metadata_quorum=[quorum.isolated_kraft])
-    def test_all_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum):
+            metadata_quorum=[quorum.isolated_kraft],
+            group_protocol=["classic", "streams"])
+    def test_all_brokers_bounce(self, failure_mode, num_failures, 
metadata_quorum, group_protocol):
         """
         Start a smoke test client, then kill a few brokers and ensure data is 
still received
         Record if records are delivered
@@ -284,7 +288,7 @@ class StreamsBrokerBounceTest(Test):
         self.topics['__consumer_offsets'] = { 'partitions': 50, 
'replication-factor': self.replication,
                                               'configs': 
{"min.insync.replicas": 1} }
 
-        self.setup_system()
+        self.setup_system(group_protocol=group_protocol)
 
         # Sleep to allow test to run for a bit
         time.sleep(120)

Reply via email to