This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 150b0e8290c KAFKA-15578: Migrating other system tests to use the group 
coordinator (#14582)
150b0e8290c is described below

commit 150b0e8290cda57df668ba89f6b422719866de5a
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Nov 22 01:52:30 2023 -0800

    KAFKA-15578: Migrating other system tests to use the group coordinator 
(#14582)
    
    This patch converts a few more system tests to using the new group 
coordinator. This is only applied to KRaft clusters.
    
    Reviewers: David Jacot <[email protected]>
---
 .../tests/client/consumer_rolling_upgrade_test.py  |  11 +-
 tests/kafkatest/tests/client/consumer_test.py      | 133 ++++++++++++++---
 .../tests/connect/connect_distributed_test.py      | 161 ++++++++++++++++++---
 .../tests/core/consumer_group_command_test.py      |  28 +++-
 .../tests/core/fetch_from_follower_test.py         |  11 +-
 tests/kafkatest/tests/core/kraft_upgrade_test.py   |  24 ++-
 .../tests/core/reassign_partitions_test.py         |  17 ++-
 tests/kafkatest/tests/core/replica_scale_test.py   |  36 ++++-
 .../tests/core/replication_replica_failure_test.py |  11 +-
 tests/kafkatest/tests/core/security_test.py        |  39 ++++-
 tests/kafkatest/tests/core/snapshot_test.py        |  14 +-
 tests/kafkatest/tests/core/transactions_test.py    |  22 ++-
 .../streams/streams_broker_down_resilience_test.py |  16 +-
 .../tests/streams/streams_standby_replica_test.py  |   4 +-
 .../streams/streams_static_membership_test.py      |   4 +-
 15 files changed, 429 insertions(+), 102 deletions(-)

diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py 
b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 5beacf23c63..9805d3656ee 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -48,8 +48,15 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
             "Mismatched assignment: %s" % assignment
 
     @cluster(num_nodes=4)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def rolling_update_test(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def rolling_update_test(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         Verify rolling updates of partition assignment strategies works 
correctly. In this
         test, we use a rolling restart to change the group's assignment 
strategy from "range" 
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 3b535c97251..6dc1fb897b0 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -75,8 +75,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
         return consumer
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         Verify correct consumer behavior when the brokers are consecutively 
restarted.
 
@@ -126,8 +133,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
             (consumer.total_consumed(), consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk):
+    @matrix(
+        clean_shutdown=[True],
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean_shutdown=[True],
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]   
+    )
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Verify correct consumer behavior when the consumers in the group are 
consecutively restarted.
 
@@ -169,8 +187,23 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.current_position(partition), 
consumer.total_consumed())
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], static_membership=[True, False], 
bounce_mode=["all", "rolling"], num_bounces=[5], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_static_consumer_bounce(self, clean_shutdown, static_membership, 
bounce_mode, num_bounces, metadata_quorum=quorum.zk):
+    @matrix(
+        clean_shutdown=[True],
+        static_membership=[True, False],
+        bounce_mode=["all", "rolling"],
+        num_bounces=[5],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean_shutdown=[True],
+        static_membership=[True, False],
+        bounce_mode=["all", "rolling"],
+        num_bounces=[5],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_static_consumer_bounce(self, clean_shutdown, static_membership, 
bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Verify correct static consumer behavior when the consumers in the 
group are restarted. In order to make
         sure the behavior of static members are different from dynamic ones, 
we take both static and dynamic
@@ -231,8 +264,17 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.current_position(partition), 
consumer.total_consumed())
 
     @cluster(num_nodes=7)
-    @matrix(bounce_mode=["all", "rolling"], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_static_consumer_persisted_after_rejoin(self, bounce_mode, 
metadata_quorum=quorum.zk):
+    @matrix(
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_static_consumer_persisted_after_rejoin(self, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Verify that the updated member.id(updated_member_id) caused by static 
member rejoin would be persisted. If not,
         after the brokers rolling bounce, the migrated group coordinator would 
load the stale persisted member.id and
@@ -262,8 +304,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
         self.rolling_bounce_brokers(consumer, num_bounces=1)
 
     @cluster(num_nodes=10)
-    @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_fencing_static_consumer(self, num_conflict_consumers, 
fencing_stage, metadata_quorum=quorum.zk):
+    @matrix(
+        num_conflict_consumers=[1, 2],
+        fencing_stage=["stable", "all"],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        num_conflict_consumers=[1, 2],
+        fencing_stage=["stable", "all"],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_fencing_static_consumer(self, num_conflict_consumers, 
fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Verify correct static consumer behavior when there are conflicting 
consumers with same group.instance.id.
 
@@ -315,8 +368,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
                        )
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True], enable_autocommit=[True, False], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk):
+    @matrix(
+        clean_shutdown=[True],
+        enable_autocommit=[True, False],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean_shutdown=[True],
+        enable_autocommit=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit)
@@ -362,8 +426,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.last_commit(partition), 
consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk):
+    @matrix(
+        clean_shutdown=[True, False],
+        enable_autocommit=[True, False],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean_shutdown=[True, False],
+        enable_autocommit=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit)
@@ -399,8 +474,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 (consumer.last_commit(partition), 
consumer.current_position(partition))
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_group_consumption(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_group_consumption(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         Verifies correct group rebalance behavior as consumers are started and 
stopped.
         In particular, this test verifies that the partition is readable after 
every
@@ -450,10 +532,21 @@ class AssignmentValidationTest(VerifiableConsumerTest):
         })
 
     @cluster(num_nodes=6)
-    
@matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-                                 
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
-                                 
"org.apache.kafka.clients.consumer.StickyAssignor"], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_valid_assignment(self, assignment_strategy, 
metadata_quorum=quorum.zk):
+    @matrix(
+        assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+                             
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
+                             
"org.apache.kafka.clients.consumer.StickyAssignor"], 
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+                             
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
+                             
"org.apache.kafka.clients.consumer.StickyAssignor"], 
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_valid_assignment(self, assignment_strategy, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Verify assignment strategy correctness: each partition is assigned to 
exactly
         one consumer instance.
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py 
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 6b2394fe659..145f29f51eb 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -171,8 +171,19 @@ class ConnectDistributedTest(Test):
         return self._task_has_state(task_id, status, 'RUNNING')
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_restart_failed_connector(self, exactly_once_source, 
connect_protocol, metadata_quorum):
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_restart_failed_connector(self, exactly_once_source, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source 
else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -194,8 +205,19 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see connector transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=['source', 'exactly-once source', 'sink'], 
connect_protocol=['sessioned', 'compatible', 'eager'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_restart_failed_task(self, connector_type, connect_protocol, 
metadata_quorum):
+    @matrix(
+        connector_type=['source', 'exactly-once source', 'sink'],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        connector_type=['source', 'exactly-once source', 'sink'],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_restart_failed_task(self, connector_type, connect_protocol, 
metadata_quorum, use_new_coordinator=False):
         self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type == 
'exactly-once source' else 'disabled'
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
@@ -220,8 +242,17 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_restart_connector_and_tasks_failed_connector(self, 
connect_protocol, metadata_quorum):
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_restart_connector_and_tasks_failed_connector(self, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -239,8 +270,19 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see connector transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned', 
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_restart_connector_and_tasks_failed_task(self, connector_type, 
connect_protocol, metadata_quorum):
+    @matrix(
+        connector_type=['source', 'sink'],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        connector_type=['source', 'sink'],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_restart_connector_and_tasks_failed_task(self, connector_type, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services()
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
@@ -264,8 +306,19 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to see task transition to the RUNNING 
state")
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_pause_and_resume_source(self, exactly_once_source, 
connect_protocol, metadata_quorum):
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_pause_and_resume_source(self, exactly_once_source, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
         """
         Verify that source connectors stop producing records when paused and 
begin again after
         being resumed.
@@ -306,8 +359,17 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to produce messages after resuming source 
connector")
 
     @cluster(num_nodes=5)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum):
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
         """
         Verify that sink connectors stop consuming records when paused and 
begin again after
         being resumed.
@@ -354,8 +416,19 @@ class ConnectDistributedTest(Test):
                    err_msg="Failed to consume messages after resuming sink 
connector")
 
     @cluster(num_nodes=5)
-    @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned', 
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_pause_state_persistent(self, exactly_once_source, 
connect_protocol, metadata_quorum):
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_pause_state_persistent(self, exactly_once_source, 
connect_protocol, metadata_quorum, use_new_coordinator=False):
         """
         Verify that paused state is preserved after a cluster restart.
         """
@@ -560,8 +633,21 @@ class ConnectDistributedTest(Test):
         )
 
     @cluster(num_nodes=6)
-    @matrix(security_protocol=[SecurityConfig.PLAINTEXT, 
SecurityConfig.SASL_SSL], exactly_once_source=[True, False], 
connect_protocol=['sessioned', 'compatible', 'eager'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_file_source_and_sink(self, security_protocol, 
exactly_once_source, connect_protocol, metadata_quorum):
+    @matrix(
+        security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
+        exactly_once_source=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
+        exactly_once_source=[True, False], 
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_file_source_and_sink(self, security_protocol, 
exactly_once_source, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
         """
         Tests that a basic file connector works across clean rolling bounces. 
This validates that the connector is
         correctly created, tasks instantiated, and as nodes restart the work 
is rebalanced across nodes.
@@ -594,8 +680,19 @@ class ConnectDistributedTest(Test):
         wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST + 
self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never 
converged to the same state as the input file")
 
     @cluster(num_nodes=6)
-    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 
'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_bounce(self, clean, connect_protocol, metadata_quorum):
+    @matrix(
+        clean=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_bounce(self, clean, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
         """
         Validates that source and sink tasks that run continuously and produce 
a predictable sequence of messages
         run correctly and deliver messages exactly once when Kafka Connect 
workers undergo clean rolling bounces,
@@ -714,8 +811,19 @@ class ConnectDistributedTest(Test):
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
     @cluster(num_nodes=6)
-    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 
'eager'], metadata_quorum=quorum.all_non_upgrade)
-    def test_exactly_once_source(self, clean, connect_protocol, 
metadata_quorum):
+    @matrix(
+        clean=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        clean=[True, False],
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_exactly_once_source(self, clean, connect_protocol, 
metadata_quorum, use_new_coordinator=False):
         """
         Validates that source tasks run correctly and deliver messages exactly 
once
         when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -811,8 +919,17 @@ class ConnectDistributedTest(Test):
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
     @cluster(num_nodes=6)
-    @matrix(connect_protocol=['sessioned', 'compatible', 'eager'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_transformations(self, connect_protocol, metadata_quorum):
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        connect_protocol=['sessioned', 'compatible', 'eager'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_transformations(self, connect_protocol, metadata_quorum, 
use_new_coordinator=False):
         self.CONNECT_PROTOCOL = connect_protocol
         self.setup_services(timestamp_type='CreateTime', 
include_filestream_connectors=True)
         self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py 
b/tests/kafkatest/tests/core/consumer_group_command_test.py
index f81eec8de8c..23406770548 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -82,7 +82,7 @@ class ConsumerGroupCommandTest(Test):
 
         if group:
             wait_until(lambda: 
re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group,
 node=kafka_node, command_config=command_config_file)), timeout_sec=10,
-                       err_msg="Timed out waiting to list expected consumer 
groups.")
+                       err_msg="Timed out waiting to describe expected 
consumer groups.")
         else:
             wait_until(lambda: "test-consumer-group" in 
self.kafka.list_consumer_groups(node=kafka_node, 
command_config=command_config_file), timeout_sec=10,
                        err_msg="Timed out waiting to list expected consumer 
groups.")
@@ -90,8 +90,17 @@ class ConsumerGroupCommandTest(Test):
         self.consumer.stop()
 
     @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_list_consumer_groups(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk):
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_list_consumer_groups(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Tests if ConsumerGroupCommand is listing correct consumer groups
         :return: None
@@ -99,8 +108,17 @@ class ConsumerGroupCommandTest(Test):
         self.setup_and_verify(security_protocol)
 
     @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_describe_consumer_group(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk):
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        security_protocol=['PLAINTEXT', 'SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_describe_consumer_group(self, security_protocol='PLAINTEXT', 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Tests if ConsumerGroupCommand is describing a consumer group correctly
         :return: None
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py 
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index f720de11464..96f3d16cdd1 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -70,8 +70,15 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
         self.kafka.start()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         This test starts up brokers with "broker.rack" and 
"replica.selector.class" configurations set. The replica
         selector is set to the rack-aware implementation. One of the brokers 
has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py 
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index 221e43f6baf..f6a29186956 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
 from ducktape.mark.resource import cluster
 from ducktape.utils.util import wait_until
 from kafkatest.services.console_consumer import ConsoleConsumer
@@ -109,22 +109,16 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
         assert self.kafka.check_protocol_errors(self)
 
     @cluster(num_nodes=5)
-    @parametrize(from_kafka_version=str(LATEST_3_1), 
metadata_quorum=combined_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_2), 
metadata_quorum=combined_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_3), 
metadata_quorum=combined_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_4), 
metadata_quorum=combined_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_5), 
metadata_quorum=combined_kraft)
-    @parametrize(from_kafka_version=str(DEV_BRANCH), 
metadata_quorum=combined_kraft)
-    def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum):
+    @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], 
+            use_new_coordinator=[True, False], 
+            metadata_quorum=[combined_kraft])
+    def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False):
         self.run_upgrade(from_kafka_version)
 
     @cluster(num_nodes=8)
-    @parametrize(from_kafka_version=str(LATEST_3_1), 
metadata_quorum=isolated_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_2), 
metadata_quorum=isolated_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_3), 
metadata_quorum=isolated_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_4), 
metadata_quorum=isolated_kraft)
-    @parametrize(from_kafka_version=str(LATEST_3_5), 
metadata_quorum=isolated_kraft)
-    @parametrize(from_kafka_version=str(DEV_BRANCH), 
metadata_quorum=isolated_kraft)
-    def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum):
+    @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], 
+            use_new_coordinator=[True, False], 
+            metadata_quorum=[isolated_kraft])
+    def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, 
use_new_coordinator=False):
         self.run_upgrade(from_kafka_version)
 
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py 
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 82b11b15289..0e02c3535e7 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -132,10 +132,19 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
         time.sleep(6)
 
     @cluster(num_nodes=8)
-    @matrix(bounce_brokers=[True, False],
-            reassign_from_offset_zero=[True, False],
-            metadata_quorum=quorum.all_non_upgrade)
-    def test_reassign_partitions(self, bounce_brokers, 
reassign_from_offset_zero, metadata_quorum):
+    @matrix(
+        bounce_brokers=[True, False],
+        reassign_from_offset_zero=[True, False],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        bounce_brokers=[True, False],
+        reassign_from_offset_zero=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_reassign_partitions(self, bounce_brokers, 
reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False):
         """Reassign partitions tests.
         Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20, 
replication-factor=3,
         and min.insync.replicas=3
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py 
b/tests/kafkatest/tests/core/replica_scale_test.py
index f8d0c8379bd..22555d202d3 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -48,8 +48,22 @@ class ReplicaScaleTest(Test):
             self.zk.stop()
 
     @cluster(num_nodes=12)
-    @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_produce_consume(self, topic_count, partition_count, 
replication_factor, metadata_quorum=quorum.zk):
+    @matrix(
+        topic_count=[50],
+        partition_count=[34],
+        replication_factor=[3],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        topic_count=[50],
+        partition_count=[34],
+        replication_factor=[3],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_produce_consume(self, topic_count, partition_count, 
replication_factor, 
+                             metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "replicas_produce_consume_%d" % i
@@ -103,8 +117,22 @@ class ReplicaScaleTest(Test):
         trogdor.stop()
 
     @cluster(num_nodes=12)
-    @matrix(topic_count=[50], partition_count=[34], replication_factor=[3], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_clean_bounce(self, topic_count, partition_count, 
replication_factor, metadata_quorum=quorum.zk):
+    @matrix(
+        topic_count=[50],
+        partition_count=[34],
+        replication_factor=[3],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        topic_count=[50],
+        partition_count=[34],
+        replication_factor=[3],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_clean_bounce(self, topic_count, partition_count, 
replication_factor,
+                          metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         topics_create_start_time = time.time()
         for i in range(topic_count):
             topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py 
b/tests/kafkatest/tests/core/replication_replica_failure_test.py
index 42e1c0922f7..66cd1ecac1d 100644
--- a/tests/kafkatest/tests/core/replication_replica_failure_test.py
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -37,8 +37,15 @@ class ReplicationReplicaFailureTest(EndToEndTest):
         super(ReplicationReplicaFailureTest, 
self).__init__(test_context=test_context, topic=None)
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=quorum.all_non_upgrade)
-    def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         This test verifies that replication shrinks the ISR when a replica is 
not fetching anymore.
         It also verifies that replication provides simple durability 
guarantees by checking that data acked by
diff --git a/tests/kafkatest/tests/core/security_test.py 
b/tests/kafkatest/tests/core/security_test.py
index cee3a3740ba..8b7c05de8ba 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -58,9 +58,31 @@ class SecurityTest(EndToEndTest):
         return True
 
     @cluster(num_nodes=6)
-    @matrix(security_protocol=['PLAINTEXT'], 
interbroker_security_protocol=['SSL'], metadata_quorum=quorum.all_non_upgrade)
-    @matrix(security_protocol=['SSL'], 
interbroker_security_protocol=['PLAINTEXT'], 
metadata_quorum=quorum.all_non_upgrade)
-    def test_client_ssl_endpoint_validation_failure(self, security_protocol, 
interbroker_security_protocol, metadata_quorum=quorum.zk):
+    @matrix(
+        security_protocol=['PLAINTEXT'],
+        interbroker_security_protocol=['SSL'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        security_protocol=['PLAINTEXT'],
+        interbroker_security_protocol=['SSL'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    @matrix(
+        security_protocol=['SSL'],
+        interbroker_security_protocol=['PLAINTEXT'],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        security_protocol=['SSL'],
+        interbroker_security_protocol=['PLAINTEXT'],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_client_ssl_endpoint_validation_failure(self, security_protocol, 
interbroker_security_protocol, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
         """
         Test that invalid hostname in certificate results in connection 
failures.
         When security_protocol=SSL, client SSL handshakes are expected to fail 
due to hostname verification failure.
@@ -132,8 +154,15 @@ class SecurityTest(EndToEndTest):
         self.consumer.start()
 
     @cluster(num_nodes=2)
-    @matrix(metadata_quorum=[quorum.zk, quorum.isolated_kraft])
-    def test_quorum_ssl_endpoint_validation_failure(self, 
metadata_quorum=quorum.zk):
+    @matrix(
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True, False]
+    )
+    def test_quorum_ssl_endpoint_validation_failure(self, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
         """
         Test that invalid hostname in ZooKeeper or KRaft Controller results in 
broker inability to start.
         """
diff --git a/tests/kafkatest/tests/core/snapshot_test.py 
b/tests/kafkatest/tests/core/snapshot_test.py
index ff0cff1feae..5368b48e5c1 100644
--- a/tests/kafkatest/tests/core/snapshot_test.py
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -144,8 +144,11 @@ class TestSnapshots(ProduceConsumeValidateTest):
         self.validate()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=quorum.all_kraft)
-    def test_broker(self, metadata_quorum=quorum.combined_kraft):
+    @matrix(
+        metadata_quorum=quorum.all_kraft,
+        use_new_coordinator=[True, False]
+    )
+    def test_broker(self, metadata_quorum=quorum.combined_kraft, 
use_new_coordinator=False):
         """ Test the ability of a broker to consume metadata snapshots
         and to recover the cluster metadata state using them
 
@@ -204,8 +207,11 @@ class TestSnapshots(ProduceConsumeValidateTest):
         self.validate_success(broker_topic)
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=quorum.all_kraft)
-    def test_controller(self, metadata_quorum=quorum.combined_kraft):
+    @matrix(
+        metadata_quorum=quorum.all_kraft,
+        use_new_coordinator=[True, False]
+    )
+    def test_controller(self, metadata_quorum=quorum.combined_kraft, 
use_new_coordinator=False):
         """ Test the ability of controllers to consume metadata snapshots
         and to recover the cluster metadata state using them
 
diff --git a/tests/kafkatest/tests/core/transactions_test.py 
b/tests/kafkatest/tests/core/transactions_test.py
index 51ee971ef8f..b9b39f355e4 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -243,11 +243,23 @@ class TransactionsTest(Test):
         }
 
     @cluster(num_nodes=9)
-    @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"],
-            check_order=[True, False],
-            use_group_metadata=[True, False])
-    def test_transactions(self, failure_mode, bounce_target, check_order, 
use_group_metadata, metadata_quorum=quorum.all):
+    @matrix(
+        failure_mode=["hard_bounce", "clean_bounce"],
+        bounce_target=["brokers", "clients"],
+        check_order=[True, False],
+        use_group_metadata=[True, False],
+        metadata_quorum=[quorum.zk],
+        use_new_coordinator=[False]
+    )
+    @matrix(
+        failure_mode=["hard_bounce", "clean_bounce"],
+        bounce_target=["brokers", "clients"],
+        check_order=[True, False],
+        use_group_metadata=[True, False],
+        metadata_quorum=quorum.all_kraft,
+        use_new_coordinator=[True, False]
+    )
+    def test_transactions(self, failure_mode, bounce_target, check_order, 
use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
diff --git 
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py 
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 92e01b87c75..5057c3d2793 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -45,8 +45,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
             self.zk.start()
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_streams_resilient_to_broker_down(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def test_streams_resilient_to_broker_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 
         # Broker should be down over 2x of retries * timeout ms
@@ -82,8 +82,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=7)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_streams_runs_with_broker_down_initially(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def test_streams_runs_with_broker_down_initially(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
         node = self.kafka.leader(self.inputTopic)
         self.kafka.stop_node(node)
@@ -150,8 +150,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
@@ -229,8 +229,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
         self.kafka.stop()
 
     @cluster(num_nodes=9)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_streams_should_failover_while_brokers_down(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def test_streams_should_failover_while_brokers_down(self, metadata_quorum, 
use_new_coordinator=False):
         self.kafka.start()
 
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py 
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 382bbb7aaf3..78df8d3ebc3 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -47,8 +47,8 @@ class StreamsStandbyTask(BaseStreamsTest):
                                                  })
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_standby_tasks_rebalance(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def test_standby_tasks_rebalance(self, metadata_quorum, 
use_new_coordinator=False):
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
         configs = self.get_configs(
             
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
 % (
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py 
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 3bed575fa2b..2249f6cc345 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -55,8 +55,8 @@ class StreamsStaticMembershipTest(Test):
                                            acks=1)
 
     @cluster(num_nodes=8)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def 
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, 
metadata_quorum):
+    @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
+    def 
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, 
metadata_quorum, use_new_coordinator=False):
         if self.zookeeper:
             self.zookeeper.start()
         self.kafka.start()


Reply via email to