This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 14ea1cf61a1 KAFKA-19202: Enable KIP-1071 in streams_broker_bounce_test.py (#19584) 14ea1cf61a1 is described below commit 14ea1cf61a10a8310a8a364cec0bbfbe67dbe51f Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Tue Apr 29 18:11:46 2025 +0200 KAFKA-19202: Enable KIP-1071 in streams_broker_bounce_test.py (#19584) Enable KIP-1071 in the next system test. Reviewers: Bill Bejeck <b...@confluent.io> --- .../tests/streams/streams_broker_bounce_test.py | 36 ++++++++++++---------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index dc61e5fa37c..e9cd1315cf5 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -149,10 +149,10 @@ class StreamsBrokerBounceTest(Test): return True - def setup_system(self, start_processor=True, num_threads=3): + def setup_system(self, start_processor=True, num_threads=3, group_protocol='classic'): # Setup phase - - self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics) + use_streams_groups = True if group_protocol == 'streams' else False + self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics, use_streams_groups=use_streams_groups) self.kafka.start() # allow some time for topics to be created @@ -162,7 +162,7 @@ class StreamsBrokerBounceTest(Test): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) + self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", group_protocol=group_protocol, num_threads = num_threads) self.driver.start() @@ -207,15 +207,16 @@ class StreamsBrokerBounceTest(Test): broker_type=["leader"], num_threads=[1, 3], sleep_time_secs=[120], - metadata_quorum=[quorum.combined_kraft]) - def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum): + metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum, group_protocol): """ Start a smoke test client, then kill one particular broker and ensure data is still received Record if records are delivered. We also add a single thread stream client to make sure we could get all partitions reassigned in next generation so to verify the partition lost is correctly triggered. """ - self.setup_system(num_threads=num_threads) + self.setup_system(num_threads=num_threads, group_protocol=group_protocol) # Sleep to allow test to run for a bit time.sleep(sleep_time_secs) @@ -230,14 +231,15 @@ class StreamsBrokerBounceTest(Test): @matrix(failure_mode=["clean_shutdown"], broker_type=["controller"], sleep_time_secs=[0], - metadata_quorum=[quorum.combined_kraft]) - def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum): + metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum, group_protocol): """ Start a smoke test client, then kill one particular broker immediately before streams stats Streams should throw an exception since it cannot create topics with the desired replication factor of 3 """ - self.setup_system(start_processor=False) + self.setup_system(start_processor=False, group_protocol=group_protocol) # Sleep to allow test to run for a bit time.sleep(sleep_time_secs) @@ -252,13 +254,14 @@ class StreamsBrokerBounceTest(Test): @cluster(num_nodes=10) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], num_failures=[2], - metadata_quorum=[quorum.isolated_kraft]) - def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum): + metadata_quorum=[quorum.isolated_kraft], + group_protocol=["classic", "streams"]) + def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, group_protocol): """ Start a smoke test client, then kill a few brokers and ensure data is still received Record if records are delivered """ - self.setup_system() + self.setup_system(group_protocol=group_protocol) # Sleep to allow test to run for a bit time.sleep(120) @@ -271,8 +274,9 @@ class StreamsBrokerBounceTest(Test): @cluster(num_nodes=10) @matrix(failure_mode=["clean_bounce", "hard_bounce"], num_failures=[3], - metadata_quorum=[quorum.isolated_kraft]) - def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum): + metadata_quorum=[quorum.isolated_kraft], + group_protocol=["classic", "streams"]) + def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, group_protocol): """ Start a smoke test client, then kill a few brokers and ensure data is still received Record if records are delivered @@ -284,7 +288,7 @@ class StreamsBrokerBounceTest(Test): self.topics['__consumer_offsets'] = { 'partitions': 50, 'replication-factor': self.replication, 'configs': {"min.insync.replicas": 1} } - self.setup_system() + self.setup_system(group_protocol=group_protocol) # Sleep to allow test to run for a bit time.sleep(120)