This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e95e91a0623 KAFKA-16275: Update kraft_upgrade_test.py to support 
KIP-848’s group protocol config (#15626)
e95e91a0623 is described below

commit e95e91a0623b3f84438e4ebc8e77d1e37979ef62
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Wed Apr 3 10:12:51 2024 -0700

    KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group 
protocol config (#15626)
    
    Added a new optional group_protocol parameter to the test methods, then 
passed that down to the methods involved.
    
    Unfortunately, because the new consumer can only be used with the new 
coordinator, this required a new @matrix block instead of adding the 
group_protocol=["classic", "consumer"] to the existing blocks 😢
    
    Reviewers: Walker Carlson <wcarl...@apache.org>
---
 tests/kafkatest/tests/core/kraft_upgrade_test.py | 29 ++++++++++++++++--------
 1 file changed, 19 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py 
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index 3f3c4a81b10..2d271b9e061 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -17,12 +17,12 @@ from ducktape.mark import parametrize, matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, consumer_group
 from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
 from kafkatest.utils import is_int
-from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, 
LATEST_3_5, \
+from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, 
LATEST_3_5, LATEST_3_7, \
     DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
 
 #
@@ -74,7 +74,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
         self.logger.info("Changing metadata.version to %s" % 
LATEST_STABLE_METADATA_VERSION)
         self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
 
-    def run_upgrade(self, from_kafka_version):
+    def run_upgrade(self, from_kafka_version, group_protocol):
         """Test upgrade of Kafka broker cluster from various versions to the 
current version
 
         from_kafka_version is a Kafka version to upgrade from.
@@ -101,7 +101,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
                                            
version=KafkaVersion(from_kafka_version))
         self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
                                         self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
-                                        message_validator=is_int, 
version=KafkaVersion(from_kafka_version))
+                                        message_validator=is_int, 
version=KafkaVersion(from_kafka_version),
+                                        
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
         self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_version_change(from_kafka_version))
         cluster_id = self.kafka.cluster_id()
         assert cluster_id is not None
@@ -112,13 +113,21 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
     @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], 
             use_new_coordinator=[True, False], 
             metadata_quorum=[combined_kraft])
-    def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False):
-        self.run_upgrade(from_kafka_version)
+    @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
+            use_new_coordinator=[True],
+            metadata_quorum=[combined_kraft],
+            group_protocol=consumer_group.all_group_protocols)
+    def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
+        self.run_upgrade(from_kafka_version, group_protocol)
 
     @cluster(num_nodes=8)
-    @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], 
-            use_new_coordinator=[True, False], 
+    @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
+            use_new_coordinator=[True, False],
             metadata_quorum=[isolated_kraft])
-    def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False):
-        self.run_upgrade(from_kafka_version)
+    @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
+            use_new_coordinator=[True],
+            metadata_quorum=[isolated_kraft],
+            group_protocol=consumer_group.all_group_protocols)
+    def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False, group_protocol=None):
+        self.run_upgrade(from_kafka_version, group_protocol)
 

Reply via email to