This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 804bf596a08 KAFKA-18329; [1/3] Delete old group coordinator (KIP-848)
(#19243)
804bf596a08 is described below
commit 804bf596a0841521d2a351dfa6643cbf85529978
Author: David Jacot <[email protected]>
AuthorDate: Thu Mar 20 17:20:19 2025 +0100
KAFKA-18329; [1/3] Delete old group coordinator (KIP-848) (#19243)
This patch is the first of a series of patches to remove the old group
coordinator. With the release of Apache Kafka 4.0, the so-called new
group coordinator is the default and only option available now.
This patch update the system tests to not run with the old group
coordinator. It also removed the ability to use the old group
coordinator.
Reviewers: Lianet Magrans <[email protected]>
---
tests/kafkatest/services/kafka/config_property.py | 4 -
tests/kafkatest/services/kafka/kafka.py | 15 ----
.../tests/client/consumer_rolling_upgrade_test.py | 5 +-
tests/kafkatest/tests/client/consumer_test.py | 81 +++------------------
tests/kafkatest/tests/client/truncation_test.py | 4 +-
.../tests/connect/connect_distributed_test.py | 85 ++++------------------
tests/kafkatest/tests/core/consume_bench_test.py | 46 ++----------
.../tests/core/consumer_group_command_test.py | 16 +---
.../tests/core/fetch_from_follower_test.py | 7 +-
.../tests/core/reassign_partitions_test.py | 9 +--
tests/kafkatest/tests/core/replica_scale_test.py | 15 +---
.../tests/core/replication_replica_failure_test.py | 7 +-
tests/kafkatest/tests/core/security_test.py | 23 +-----
tests/kafkatest/tests/core/snapshot_test.py | 14 +---
.../tests/core/transactions_mixed_versions_test.py | 3 +-
tests/kafkatest/tests/core/transactions_test.py | 12 +--
.../tests/core/transactions_upgrade_test.py | 3 +-
tests/kafkatest/tests/core/upgrade_test.py | 8 +-
.../streams/streams_broker_down_resilience_test.py | 16 ++--
.../tests/streams/streams_standby_replica_test.py | 4 +-
.../streams/streams_static_membership_test.py | 4 +-
21 files changed, 64 insertions(+), 317 deletions(-)
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 295c235bd2e..049af5a10a9 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -74,7 +74,6 @@
DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
-NEW_GROUP_COORDINATOR_ENABLE="group.coordinator.new.enable"
GROUP_COORDINATOR_REBALANCE_PROTOCOLS="group.coordinator.rebalance.protocols"
CONSUMER_GROUP_MIGRATION_POLICY = "group.consumer.migration.policy"
@@ -209,9 +208,6 @@ From KafkaConfig.scala
val SSLEndpointIdentificationAlgorithmProp =
SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
val SSLSecureRandomImplementationProp =
SSLConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
-
- /** New group coordinator configs */
- val NewGroupCoordinatorEnableProp = "group.coordinator.new.enable"
"""
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 88f24b14424..a55844548ba 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -203,7 +203,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
quorum_info_provider=None,
- use_new_coordinator=None,
consumer_group_migration_policy=None,
dynamicRaftQuorum=False,
use_transactions_v2=False,
@@ -268,7 +267,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
:param int controller_num_nodes_override: the number of controller
nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if
positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or
controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService
as an argument and returns a ServiceQuorumInfo. If this is None, then the
ServiceQuorumInfo is generated from the test context
- :param use_new_coordinator: When true, use the new implementation of
the group coordinator as per KIP-848. If this is None, the default existing
group coordinator is used.
:param consumer_group_migration_policy: The config that enables
converting the non-empty classic group using the consumer embedded protocol to
the non-empty consumer group using the consumer group protocol and vice versa.
:param dynamicRaftQuorum: When true,
controller_quorum_bootstrap_servers, and bootstraps the first controller using
the standalone flag
:param use_transactions_v2: When true, uses transaction.version=2
which utilizes the new transaction protocol introduced in KIP-890
@@ -286,15 +284,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.isolated_controller_quorum = None # will define below if necessary
self.dynamicRaftQuorum = False
- # Set use_new_coordinator based on context and arguments.
- # If not specified, the default config is used.
- if use_new_coordinator is None:
- arg_name = 'use_new_coordinator'
- if context.injected_args is not None:
- use_new_coordinator = context.injected_args.get(arg_name)
- if use_new_coordinator is None:
- use_new_coordinator = context.globals.get(arg_name)
-
# Set use_share_groups based on context and arguments.
# If not specified, the default config is used.
if use_share_groups is None:
@@ -305,7 +294,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
use_share_groups = context.globals.get(arg_name)
# Assign the determined value.
- self.use_new_coordinator = use_new_coordinator
self.use_transactions_v2 = use_transactions_v2
self.use_share_groups = use_share_groups
@@ -779,9 +767,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
else:
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE]
= 'false'
- if self.use_new_coordinator is not None:
- override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] =
str(self.use_new_coordinator)
-
if self.consumer_group_migration_policy is not None:
override_configs[config_property.CONSUMER_GROUP_MIGRATION_POLICY]
= str(self.consumer_group_migration_policy)
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 69254149254..19a3cdcde52 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -49,10 +49,9 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
@cluster(num_nodes=4)
@matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ metadata_quorum=[quorum.isolated_kraft]
)
- def rolling_update_test(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
+ def rolling_update_test(self, metadata_quorum=quorum.zk):
"""
Verify rolling updates of partition assignment strategies works
correctly. In this
test, we use a rolling restart to change the group's assignment
strategy from "range"
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index 5e5c5ff308d..c6ecfc371a6 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -77,14 +77,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_broker_rolling_bounce(self,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_broker_rolling_bounce(self,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Verify correct consumer behavior when the brokers are consecutively
restarted.
@@ -134,16 +129,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
clean_shutdown=[True],
bounce_mode=["all", "rolling"],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- clean_shutdown=[True],
- bounce_mode=["all", "rolling"],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Verify correct consumer behavior when the consumers in the group are
consecutively restarted.
@@ -191,18 +179,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
bounce_mode=["all", "rolling"],
num_bounces=[5],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- clean_shutdown=[True],
- static_membership=[True, False],
- bounce_mode=["all", "rolling"],
- num_bounces=[5],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol]
)
- def test_static_consumer_bounce_with_eager_assignment(self,
clean_shutdown, static_membership, bounce_mode, num_bounces,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_static_consumer_bounce_with_eager_assignment(self,
clean_shutdown, static_membership, bounce_mode, num_bounces,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Verify correct static consumer behavior when the consumers in the
group are restarted. In order to make
sure the behavior of static members are different from dynamic ones,
we take both static and dynamic
@@ -267,15 +246,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
@matrix(
bounce_mode=["all", "rolling"],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- bounce_mode=["all", "rolling"],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_static_consumer_persisted_after_rejoin(self, bounce_mode,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_static_consumer_persisted_after_rejoin(self, bounce_mode,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Verify that the updated member.id(updated_member_id) caused by static
member rejoin would be persisted. If not,
after the brokers rolling bounce, the migrated group coordinator would
load the stale persisted member.id and
@@ -308,16 +281,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
num_conflict_consumers=[1, 2],
fencing_stage=["stable", "all"],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- num_conflict_consumers=[1, 2],
- fencing_stage=["stable", "all"],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_fencing_static_consumer(self, num_conflict_consumers,
fencing_stage, metadata_quorum=quorum.isolated_kraft,
use_new_coordinator=False, group_protocol=None):
+ def test_fencing_static_consumer(self, num_conflict_consumers,
fencing_stage, metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Verify correct static consumer behavior when there are conflicting
consumers with same group.instance.id.
@@ -392,16 +358,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
clean_shutdown=[True],
enable_autocommit=[True, False],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- clean_shutdown=[True],
- enable_autocommit=[True, False],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consumer_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
partition = TopicPartition(self.TOPIC, 0)
consumer = self.setup_consumer(self.TOPIC,
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
@@ -450,16 +409,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
clean_shutdown=[True, False],
enable_autocommit=[True, False],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- clean_shutdown=[True, False],
- enable_autocommit=[True, False],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_broker_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_broker_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
partition = TopicPartition(self.TOPIC, 0)
consumer = self.setup_consumer(self.TOPIC,
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
@@ -498,14 +450,9 @@ class OffsetValidationTest(VerifiableConsumerTest):
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft,
use_new_coordinator=False, group_protocol=None):
+ def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft,
group_protocol=None):
"""
Verifies correct group rebalance behavior as consumers are started and
stopped.
In particular, this test verifies that the partition is readable after
every
@@ -561,24 +508,14 @@ class AssignmentValidationTest(VerifiableConsumerTest):
"org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
-
"org.apache.kafka.clients.consumer.StickyAssignor",
-
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol]
)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=[consumer_group.consumer_group_protocol],
group_remote_assignor=consumer_group.all_remote_assignors
)
- def test_valid_assignment(self, assignment_strategy=None,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None, group_remote_assignor=None):
+ def test_valid_assignment(self, assignment_strategy=None,
metadata_quorum=quorum.isolated_kraft, group_protocol=None,
group_remote_assignor=None):
"""
Verify assignment strategy correctness: each partition is assigned to
exactly
one consumer instance.
diff --git a/tests/kafkatest/tests/client/truncation_test.py
b/tests/kafkatest/tests/client/truncation_test.py
index 34575f33ba2..57d977f048c 100644
--- a/tests/kafkatest/tests/client/truncation_test.py
+++ b/tests/kafkatest/tests/client/truncation_test.py
@@ -53,8 +53,8 @@ class TruncationTest(VerifiableConsumerTest):
return consumer
@cluster(num_nodes=7)
- @matrix(metadata_quorum=quorum.all_non_upgrade, use_new_coordinator=[True])
- def test_offset_truncate(self, metadata_quorum, use_new_coordinator):
+ @matrix(metadata_quorum=quorum.all_non_upgrade)
+ def test_offset_truncate(self, metadata_quorum):
"""
Verify correct consumer behavior when the brokers are consecutively
restarted.
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index f00ae31cbce..553ab59f254 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -176,16 +176,9 @@ class ConnectDistributedTest(Test):
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- exactly_once_source=[True, False],
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_restart_failed_connector(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ def test_restart_failed_connector(self, exactly_once_source,
connect_protocol, metadata_quorum, group_protocol=None):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source
else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -211,16 +204,9 @@ class ConnectDistributedTest(Test):
connector_type=['source', 'exactly-once source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- connector_type=['source', 'exactly-once source', 'sink'],
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_restart_failed_task(self, connector_type, connect_protocol,
metadata_quorum, use_new_coordinator=False, group_protocol=None):
+ def test_restart_failed_task(self, connector_type, connect_protocol,
metadata_quorum, group_protocol=None):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type ==
'exactly-once source' else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -248,15 +234,9 @@ class ConnectDistributedTest(Test):
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_restart_connector_and_tasks_failed_connector(self,
connect_protocol, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ def test_restart_connector_and_tasks_failed_connector(self,
connect_protocol, metadata_quorum, group_protocol=None):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
@@ -278,16 +258,9 @@ class ConnectDistributedTest(Test):
connector_type=['source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- connector_type=['source', 'sink'],
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_restart_connector_and_tasks_failed_task(self, connector_type,
connect_protocol, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ def test_restart_connector_and_tasks_failed_task(self, connector_type,
connect_protocol, metadata_quorum, group_protocol=None):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
@@ -314,10 +287,9 @@ class ConnectDistributedTest(Test):
@matrix(
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ metadata_quorum=[quorum.isolated_kraft]
)
- def test_pause_and_resume_source(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False):
+ def test_pause_and_resume_source(self, exactly_once_source,
connect_protocol, metadata_quorum):
"""
Verify that source connectors stop producing records when paused and
begin again after
being resumed.
@@ -361,15 +333,9 @@ class ConnectDistributedTest(Test):
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum,
group_protocol=None):
"""
Verify that sink connectors stop consuming records when paused and
begin again after
being resumed.
@@ -419,10 +385,9 @@ class ConnectDistributedTest(Test):
@matrix(
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ metadata_quorum=[quorum.isolated_kraft]
)
- def test_pause_state_persistent(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False):
+ def test_pause_state_persistent(self, exactly_once_source,
connect_protocol, metadata_quorum):
"""
Verify that paused state is preserved after a cluster restart.
"""
@@ -637,22 +602,14 @@ class ConnectDistributedTest(Test):
)
@cluster(num_nodes=6)
- @matrix(
- security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
- exactly_once_source=[True, False],
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
@matrix(
security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_file_source_and_sink(self, security_protocol,
exactly_once_source, connect_protocol, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_file_source_and_sink(self, security_protocol,
exactly_once_source, connect_protocol, metadata_quorum, group_protocol=None):
"""
Tests that a basic file connector works across clean rolling bounces.
This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work
is rebalanced across nodes.
@@ -692,16 +649,9 @@ class ConnectDistributedTest(Test):
clean=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- clean=[True, False],
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_bounce(self, clean, connect_protocol, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_bounce(self, clean, connect_protocol, metadata_quorum,
group_protocol=None):
"""
Validates that source and sink tasks that run continuously and produce
a predictable sequence of messages
run correctly and deliver messages exactly once when Kafka Connect
workers undergo clean rolling bounces,
@@ -824,10 +774,9 @@ class ConnectDistributedTest(Test):
@matrix(
clean=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ metadata_quorum=[quorum.isolated_kraft]
)
- def test_exactly_once_source(self, clean, connect_protocol,
metadata_quorum, use_new_coordinator=False):
+ def test_exactly_once_source(self, clean, connect_protocol,
metadata_quorum):
"""
Validates that source tasks run correctly and deliver messages exactly
once
when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -926,15 +875,9 @@ class ConnectDistributedTest(Test):
@matrix(
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- connect_protocol=['sessioned', 'compatible', 'eager'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_transformations(self, connect_protocol, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_transformations(self, connect_protocol, metadata_quorum,
group_protocol=None):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services(timestamp_type='CreateTime',
include_filestream_connectors=True)
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py
b/tests/kafkatest/tests/core/consume_bench_test.py
index c84f0dda59e..d42bc800cc1 100644
--- a/tests/kafkatest/tests/core/consume_bench_test.py
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -68,18 +68,9 @@ class ConsumeBenchTest(Test):
["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- topics=[
- ["consume_bench_topic[0-5]"], # topic subscription
- ["consume_bench_topic[0-5]:[0-4]"] # manual topic assignment
- ],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consume_bench(self, topics, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_consume_bench(self, topics, metadata_quorum, group_protocol=None):
"""
Runs a ConsumeBench workload to consume messages
"""
@@ -102,14 +93,9 @@ class ConsumeBenchTest(Test):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_single_partition(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_single_partition(self, metadata_quorum, group_protocol=None):
"""
Run a ConsumeBench against a single partition
"""
@@ -133,14 +119,9 @@ class ConsumeBenchTest(Test):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_random_group_topics(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_random_group_topics(self, metadata_quorum,
group_protocol=None):
"""
Runs multiple consumers group to read messages from topics.
Since a consumerGroup isn't specified, each consumer should read from
all topics independently
@@ -165,14 +146,9 @@ class ConsumeBenchTest(Test):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_two_consumers_specified_group_topics(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_two_consumers_specified_group_topics(self, metadata_quorum,
group_protocol=None):
"""
Runs two consumers in the same consumer group to read messages from
topics.
Since a consumerGroup is specified, each consumer should dynamically
get assigned a partition from group
@@ -198,14 +174,9 @@ class ConsumeBenchTest(Test):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_random_group_partitions(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_random_group_partitions(self, metadata_quorum,
group_protocol=None):
"""
Runs multiple consumers in to read messages from specific partitions.
Since a consumerGroup isn't specified, each consumer will get assigned
a random group
@@ -231,14 +202,9 @@ class ConsumeBenchTest(Test):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_multiple_consumers_specified_group_partitions_should_raise(self,
metadata_quorum, use_new_coordinator=False, group_protocol=None):
+ def test_multiple_consumers_specified_group_partitions_should_raise(self,
metadata_quorum, group_protocol=None):
"""
Runs multiple consumers in the same group to read messages from
specific partitions.
It is an invalid configuration to provide a consumer group and
specific partitions.
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py
b/tests/kafkatest/tests/core/consumer_group_command_test.py
index 57af320574b..445aef32265 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -87,15 +87,9 @@ class ConsumerGroupCommandTest(Test):
@matrix(
security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- security_protocol=['PLAINTEXT', 'SSL'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Tests if ConsumerGroupCommand is listing correct consumer groups
:return: None
@@ -106,15 +100,9 @@ class ConsumerGroupCommandTest(Test):
@matrix(
security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- security_protocol=['PLAINTEXT', 'SSL'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
"""
Tests if ConsumerGroupCommand is describing a consumer group correctly
:return: None
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index 8db6e6d3110..3a8b74a0789 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -68,14 +68,9 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
@cluster(num_nodes=9)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_consumer_preferred_read_replica(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_consumer_preferred_read_replica(self, metadata_quorum,
group_protocol=None):
"""
This test starts up brokers with "broker.rack" and
"replica.selector.class" configurations set. The replica
selector is set to the rack-aware implementation. One of the brokers
has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 6c66b626c1a..72fb3d86f9f 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -142,16 +142,9 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
bounce_brokers=[True, False],
reassign_from_offset_zero=[True, False],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- bounce_brokers=[True, False],
- reassign_from_offset_zero=[True, False],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_reassign_partitions(self, bounce_brokers,
reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ def test_reassign_partitions(self, bounce_brokers,
reassign_from_offset_zero, metadata_quorum, group_protocol=None):
"""Reassign partitions tests.
Setup: 1 controller, 4 kafka nodes, 1 topic with partitions=20,
replication-factor=3,
and min.insync.replicas=3
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py
b/tests/kafkatest/tests/core/replica_scale_test.py
index cd7646a53d7..025dc7867b4 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -47,18 +47,10 @@ class ReplicaScaleTest(Test):
partition_count=[34],
replication_factor=[3],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- topic_count=[50],
- partition_count=[34],
- replication_factor=[3],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
def test_produce_consume(self, topic_count, partition_count,
replication_factor,
- metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ metadata_quorum, group_protocol=None):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
@@ -117,11 +109,10 @@ class ReplicaScaleTest(Test):
topic_count=[50],
partition_count=[34],
replication_factor=[3],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True, False]
+ metadata_quorum=[quorum.isolated_kraft]
)
def test_clean_bounce(self, topic_count, partition_count,
replication_factor,
- metadata_quorum, use_new_coordinator=False):
+ metadata_quorum):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py
b/tests/kafkatest/tests/core/replication_replica_failure_test.py
index 6e644ce5a22..82a7fb9e0d4 100644
--- a/tests/kafkatest/tests/core/replication_replica_failure_test.py
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -39,14 +39,9 @@ class ReplicationReplicaFailureTest(EndToEndTest):
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_replication_with_replica_failure(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_replication_with_replica_failure(self, metadata_quorum,
group_protocol=None):
"""
This test verifies that replication shrinks the ISR when a replica is
not fetching anymore.
It also verifies that replication provides simple durability
guarantees by checking that data acked by
diff --git a/tests/kafkatest/tests/core/security_test.py
b/tests/kafkatest/tests/core/security_test.py
index a92b1ad6b3e..0e487eff2dd 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -62,29 +62,15 @@ class SecurityTest(EndToEndTest):
security_protocol=['PLAINTEXT'],
interbroker_security_protocol=['SSL'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- security_protocol=['PLAINTEXT'],
- interbroker_security_protocol=['SSL'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
@matrix(
security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'],
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- security_protocol=['SSL'],
- interbroker_security_protocol=['PLAINTEXT'],
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum, use_new_coordinator=False,
group_protocol=None):
+ def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum, group_protocol=None):
"""
Test that invalid hostname in certificate results in connection
failures.
When security_protocol=SSL, client SSL handshakes are expected to fail
due to hostname verification failure.
@@ -154,14 +140,9 @@ class SecurityTest(EndToEndTest):
@cluster(num_nodes=2)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=[quorum.isolated_kraft],
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum,
use_new_coordinator=False, group_protocol=None):
+ def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum,
group_protocol=None):
"""
Test that invalid hostname in ZooKeeper or KRaft Controller results in
broker inability to start.
"""
diff --git a/tests/kafkatest/tests/core/snapshot_test.py
b/tests/kafkatest/tests/core/snapshot_test.py
index 25ffd4227e3..30ff270c8a5 100644
--- a/tests/kafkatest/tests/core/snapshot_test.py
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -148,14 +148,9 @@ class TestSnapshots(ProduceConsumeValidateTest):
@cluster(num_nodes=9)
@matrix(
metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_broker(self, metadata_quorum=quorum.combined_kraft,
use_new_coordinator=False, group_protocol=None):
+ def test_broker(self, metadata_quorum=quorum.combined_kraft,
group_protocol=None):
""" Test the ability of a broker to consume metadata snapshots
and to recover the cluster metadata state using them
@@ -216,14 +211,9 @@ class TestSnapshots(ProduceConsumeValidateTest):
@cluster(num_nodes=9)
@matrix(
metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[False]
- )
- @matrix(
- metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols
)
- def test_controller(self, metadata_quorum=quorum.combined_kraft,
use_new_coordinator=False, group_protocol=None):
+ def test_controller(self, metadata_quorum=quorum.combined_kraft,
group_protocol=None):
""" Test the ability of controllers to consume metadata snapshots
and to recover the cluster metadata state using them
diff --git a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
index 77c4a0adf77..65747036802 100644
--- a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
+++ b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py
@@ -181,10 +181,9 @@ class TransactionsMixedVersionsTest(Test):
@matrix(
old_kafka_version=[str(LATEST_4_0), str(LATEST_3_9), str(LATEST_3_8),
str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4),
str(LATEST_3_3)],
metadata_quorum=[isolated_kraft],
- use_new_coordinator=[False],
group_protocol=[None]
)
- def test_transactions_mixed_versions(self, old_kafka_version,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_transactions_mixed_versions(self, old_kafka_version,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
oldKafkaVersion = KafkaVersion(old_kafka_version)
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
diff --git a/tests/kafkatest/tests/core/transactions_test.py
b/tests/kafkatest/tests/core/transactions_test.py
index e329b3f7ac7..154ccd917eb 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -207,20 +207,10 @@ class TransactionsTest(Test):
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[False],
- use_transactions_v2=[True, False]
- )
- @matrix(
- failure_mode=["hard_bounce", "clean_bounce"],
- bounce_target=["brokers", "clients"],
- check_order=[True, False],
- use_group_metadata=[True, False],
- metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[True],
group_protocol=consumer_group.all_group_protocols,
use_transactions_v2=[True, False]
)
- def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum, use_new_coordinator=False,
group_protocol=None, use_transactions_v2=False):
+ def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum, group_protocol=None,
use_transactions_v2=False):
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
zk=None,
diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py
b/tests/kafkatest/tests/core/transactions_upgrade_test.py
index d1dff452c5b..495e91e430b 100644
--- a/tests/kafkatest/tests/core/transactions_upgrade_test.py
+++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py
@@ -211,10 +211,9 @@ class TransactionsUpgradeTest(Test):
@matrix(
from_kafka_version=[str(LATEST_4_0), str(LATEST_3_9), str(LATEST_3_8),
str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4),
str(LATEST_3_3)],
metadata_quorum=[isolated_kraft],
- use_new_coordinator=[False],
group_protocol=[None]
)
- def test_transactions_upgrade(self, from_kafka_version,
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False,
group_protocol=None):
+ def test_transactions_upgrade(self, from_kafka_version,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
fromKafkaVersion = KafkaVersion(from_kafka_version)
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
diff --git a/tests/kafkatest/tests/core/upgrade_test.py
b/tests/kafkatest/tests/core/upgrade_test.py
index a63e6c1fb89..4bd188d2c17 100644
--- a/tests/kafkatest/tests/core/upgrade_test.py
+++ b/tests/kafkatest/tests/core/upgrade_test.py
@@ -165,23 +165,23 @@ class TestUpgrade(ProduceConsumeValidateTest):
@cluster(num_nodes=5)
@matrix(from_kafka_version=[str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0), str(DEV_BRANCH)],
metadata_quorum=[combined_kraft])
- def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum,
use_new_coordinator=False):
+ def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version)
@cluster(num_nodes=8)
@matrix(from_kafka_version=[str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0), str(DEV_BRANCH)],
metadata_quorum=[isolated_kraft])
- def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum,
use_new_coordinator=False):
+ def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum):
self.run_upgrade(from_kafka_version)
@cluster(num_nodes=5)
@matrix(from_kafka_version=[str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0), str(DEV_BRANCH)],
metadata_quorum=[combined_kraft])
- def test_combined_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum, use_new_coordinator=False):
+ def test_combined_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum):
self.run_upgrade_downgrade(from_kafka_version)
@cluster(num_nodes=8)
@matrix(from_kafka_version=[str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0), str(DEV_BRANCH)],
metadata_quorum=[isolated_kraft])
- def test_isolated_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum, use_new_coordinator=False):
+ def test_isolated_mode_upgrade_downgrade(self, from_kafka_version,
metadata_quorum):
self.run_upgrade_downgrade(from_kafka_version)
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index d0ecd083017..98e543a36ca 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -45,8 +45,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.zk.start()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
- def test_streams_resilient_to_broker_down(self, metadata_quorum,
use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.combined_kraft])
+ def test_streams_resilient_to_broker_down(self, metadata_quorum):
self.kafka.start()
# Broker should be down over 2x of retries * timeout ms
@@ -82,8 +82,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
- def test_streams_runs_with_broker_down_initially(self, metadata_quorum,
use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.combined_kraft])
+ def test_streams_runs_with_broker_down_initially(self, metadata_quorum):
self.kafka.start()
node = self.kafka.leader(self.inputTopic)
self.kafka.stop_node(node)
@@ -150,8 +150,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
- def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.combined_kraft])
+ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
@@ -229,8 +229,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.combined_kraft],
use_new_coordinator=[True, False])
- def test_streams_should_failover_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.combined_kraft])
+ def test_streams_should_failover_while_brokers_down(self, metadata_quorum):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index d9804408df8..44cb12e5cd8 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -47,8 +47,8 @@ class StreamsStandbyTask(BaseStreamsTest):
})
@cluster(num_nodes=10)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
- def test_standby_tasks_rebalance(self, metadata_quorum,
use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.isolated_kraft])
+ def test_standby_tasks_rebalance(self, metadata_quorum):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
% (
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 3a2f41d3c34..96091e9fe8b 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -49,8 +49,8 @@ class StreamsStaticMembershipTest(Test):
acks=1)
@cluster(num_nodes=8)
- @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
- def
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self,
metadata_quorum, use_new_coordinator=False):
+ @matrix(metadata_quorum=[quorum.isolated_kraft])
+ def
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self,
metadata_quorum):
self.kafka.start()
numThreads = 3