This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 150b0e8290c KAFKA-15578: Migrating other system tests to use the group
coordinator (#14582)
150b0e8290c is described below
commit 150b0e8290cda57df668ba89f6b422719866de5a
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Nov 22 01:52:30 2023 -0800
KAFKA-15578: Migrating other system tests to use the group coordinator
(#14582)
This patch converts a few more system tests to using the new group
coordinator. This is only applied to KRaft clusters.
Reviewers: David Jacot <[email protected]>
---
.../tests/client/consumer_rolling_upgrade_test.py | 11 +-
tests/kafkatest/tests/client/consumer_test.py | 133 ++++++++++++++---
.../tests/connect/connect_distributed_test.py | 161 ++++++++++++++++++---
.../tests/core/consumer_group_command_test.py | 28 +++-
.../tests/core/fetch_from_follower_test.py | 11 +-
tests/kafkatest/tests/core/kraft_upgrade_test.py | 24 ++-
.../tests/core/reassign_partitions_test.py | 17 ++-
tests/kafkatest/tests/core/replica_scale_test.py | 36 ++++-
.../tests/core/replication_replica_failure_test.py | 11 +-
tests/kafkatest/tests/core/security_test.py | 39 ++++-
tests/kafkatest/tests/core/snapshot_test.py | 14 +-
tests/kafkatest/tests/core/transactions_test.py | 22 ++-
.../streams/streams_broker_down_resilience_test.py | 16 +-
.../tests/streams/streams_standby_replica_test.py | 4 +-
.../streams/streams_static_membership_test.py | 4 +-
15 files changed, 429 insertions(+), 102 deletions(-)
diff --git a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
index 5beacf23c63..9805d3656ee 100644
--- a/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py
@@ -48,8 +48,15 @@ class ConsumerRollingUpgradeTest(VerifiableConsumerTest):
"Mismatched assignment: %s" % assignment
@cluster(num_nodes=4)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def rolling_update_test(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def rolling_update_test(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
Verify rolling updates of partition assignment strategies works
correctly. In this
test, we use a rolling restart to change the group's assignment
strategy from "range"
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index 3b535c97251..6dc1fb897b0 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -75,8 +75,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
return consumer
@cluster(num_nodes=7)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
Verify correct consumer behavior when the brokers are consecutively
restarted.
@@ -126,8 +133,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
(consumer.total_consumed(), consumer.current_position(partition))
@cluster(num_nodes=7)
- @matrix(clean_shutdown=[True], bounce_mode=["all", "rolling"],
metadata_quorum=quorum.all_non_upgrade)
- def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.zk):
+ @matrix(
+ clean_shutdown=[True],
+ bounce_mode=["all", "rolling"],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean_shutdown=[True],
+ bounce_mode=["all", "rolling"],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Verify correct consumer behavior when the consumers in the group are
consecutively restarted.
@@ -169,8 +187,23 @@ class OffsetValidationTest(VerifiableConsumerTest):
(consumer.current_position(partition),
consumer.total_consumed())
@cluster(num_nodes=7)
- @matrix(clean_shutdown=[True], static_membership=[True, False],
bounce_mode=["all", "rolling"], num_bounces=[5],
metadata_quorum=quorum.all_non_upgrade)
- def test_static_consumer_bounce(self, clean_shutdown, static_membership,
bounce_mode, num_bounces, metadata_quorum=quorum.zk):
+ @matrix(
+ clean_shutdown=[True],
+ static_membership=[True, False],
+ bounce_mode=["all", "rolling"],
+ num_bounces=[5],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean_shutdown=[True],
+ static_membership=[True, False],
+ bounce_mode=["all", "rolling"],
+ num_bounces=[5],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_static_consumer_bounce(self, clean_shutdown, static_membership,
bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Verify correct static consumer behavior when the consumers in the
group are restarted. In order to make
sure the behavior of static members are different from dynamic ones,
we take both static and dynamic
@@ -231,8 +264,17 @@ class OffsetValidationTest(VerifiableConsumerTest):
(consumer.current_position(partition),
consumer.total_consumed())
@cluster(num_nodes=7)
- @matrix(bounce_mode=["all", "rolling"],
metadata_quorum=quorum.all_non_upgrade)
- def test_static_consumer_persisted_after_rejoin(self, bounce_mode,
metadata_quorum=quorum.zk):
+ @matrix(
+ bounce_mode=["all", "rolling"],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ bounce_mode=["all", "rolling"],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_static_consumer_persisted_after_rejoin(self, bounce_mode,
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Verify that the updated member.id(updated_member_id) caused by static
member rejoin would be persisted. If not,
after the brokers rolling bounce, the migrated group coordinator would
load the stale persisted member.id and
@@ -262,8 +304,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.rolling_bounce_brokers(consumer, num_bounces=1)
@cluster(num_nodes=10)
- @matrix(num_conflict_consumers=[1, 2], fencing_stage=["stable", "all"],
metadata_quorum=quorum.all_non_upgrade)
- def test_fencing_static_consumer(self, num_conflict_consumers,
fencing_stage, metadata_quorum=quorum.zk):
+ @matrix(
+ num_conflict_consumers=[1, 2],
+ fencing_stage=["stable", "all"],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ num_conflict_consumers=[1, 2],
+ fencing_stage=["stable", "all"],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_fencing_static_consumer(self, num_conflict_consumers,
fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Verify correct static consumer behavior when there are conflicting
consumers with same group.instance.id.
@@ -315,8 +368,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
)
@cluster(num_nodes=7)
- @matrix(clean_shutdown=[True], enable_autocommit=[True, False],
metadata_quorum=quorum.all_non_upgrade)
- def test_consumer_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.zk):
+ @matrix(
+ clean_shutdown=[True],
+ enable_autocommit=[True, False],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean_shutdown=[True],
+ enable_autocommit=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_consumer_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.zk, use_new_coordinator=False):
partition = TopicPartition(self.TOPIC, 0)
consumer = self.setup_consumer(self.TOPIC,
enable_autocommit=enable_autocommit)
@@ -362,8 +426,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
(consumer.last_commit(partition),
consumer.current_position(partition))
@cluster(num_nodes=7)
- @matrix(clean_shutdown=[True, False], enable_autocommit=[True, False],
metadata_quorum=quorum.all_non_upgrade)
- def test_broker_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.zk):
+ @matrix(
+ clean_shutdown=[True, False],
+ enable_autocommit=[True, False],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean_shutdown=[True, False],
+ enable_autocommit=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_broker_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.zk, use_new_coordinator=False):
partition = TopicPartition(self.TOPIC, 0)
consumer = self.setup_consumer(self.TOPIC,
enable_autocommit=enable_autocommit)
@@ -399,8 +474,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
(consumer.last_commit(partition),
consumer.current_position(partition))
@cluster(num_nodes=7)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_group_consumption(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_group_consumption(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
Verifies correct group rebalance behavior as consumers are started and
stopped.
In particular, this test verifies that the partition is readable after
every
@@ -450,10 +532,21 @@ class AssignmentValidationTest(VerifiableConsumerTest):
})
@cluster(num_nodes=6)
-
@matrix(assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
-
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
-
"org.apache.kafka.clients.consumer.StickyAssignor"],
metadata_quorum=quorum.all_non_upgrade)
- def test_valid_assignment(self, assignment_strategy,
metadata_quorum=quorum.zk):
+ @matrix(
+ assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
+
"org.apache.kafka.clients.consumer.StickyAssignor"],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
+
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
+
"org.apache.kafka.clients.consumer.StickyAssignor"],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_valid_assignment(self, assignment_strategy,
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Verify assignment strategy correctness: each partition is assigned to
exactly
one consumer instance.
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 6b2394fe659..145f29f51eb 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -171,8 +171,19 @@ class ConnectDistributedTest(Test):
return self._task_has_state(task_id, status, 'RUNNING')
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned',
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_restart_failed_connector(self, exactly_once_source,
connect_protocol, metadata_quorum):
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_restart_failed_connector(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source
else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -194,8 +205,19 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING
state")
@cluster(num_nodes=5)
- @matrix(connector_type=['source', 'exactly-once source', 'sink'],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=quorum.all_non_upgrade)
- def test_restart_failed_task(self, connector_type, connect_protocol,
metadata_quorum):
+ @matrix(
+ connector_type=['source', 'exactly-once source', 'sink'],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ connector_type=['source', 'exactly-once source', 'sink'],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_restart_failed_task(self, connector_type, connect_protocol,
metadata_quorum, use_new_coordinator=False):
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if connector_type ==
'exactly-once source' else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
@@ -220,8 +242,17 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see task transition to the RUNNING
state")
@cluster(num_nodes=5)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=quorum.all_non_upgrade)
- def test_restart_connector_and_tasks_failed_connector(self,
connect_protocol, metadata_quorum):
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_restart_connector_and_tasks_failed_connector(self,
connect_protocol, metadata_quorum, use_new_coordinator=False):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
@@ -239,8 +270,19 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see connector transition to the RUNNING
state")
@cluster(num_nodes=5)
- @matrix(connector_type=['source', 'sink'], connect_protocol=['sessioned',
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_restart_connector_and_tasks_failed_task(self, connector_type,
connect_protocol, metadata_quorum):
+ @matrix(
+ connector_type=['source', 'sink'],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ connector_type=['source', 'sink'],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_restart_connector_and_tasks_failed_task(self, connector_type,
connect_protocol, metadata_quorum, use_new_coordinator=False):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services()
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
@@ -264,8 +306,19 @@ class ConnectDistributedTest(Test):
err_msg="Failed to see task transition to the RUNNING
state")
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned',
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_pause_and_resume_source(self, exactly_once_source,
connect_protocol, metadata_quorum):
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_pause_and_resume_source(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False):
"""
Verify that source connectors stop producing records when paused and
begin again after
being resumed.
@@ -306,8 +359,17 @@ class ConnectDistributedTest(Test):
err_msg="Failed to produce messages after resuming source
connector")
@cluster(num_nodes=5)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=quorum.all_non_upgrade)
- def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum):
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_pause_and_resume_sink(self, connect_protocol, metadata_quorum,
use_new_coordinator=False):
"""
Verify that sink connectors stop consuming records when paused and
begin again after
being resumed.
@@ -354,8 +416,19 @@ class ConnectDistributedTest(Test):
err_msg="Failed to consume messages after resuming sink
connector")
@cluster(num_nodes=5)
- @matrix(exactly_once_source=[True, False], connect_protocol=['sessioned',
'compatible', 'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_pause_state_persistent(self, exactly_once_source,
connect_protocol, metadata_quorum):
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_pause_state_persistent(self, exactly_once_source,
connect_protocol, metadata_quorum, use_new_coordinator=False):
"""
Verify that paused state is preserved after a cluster restart.
"""
@@ -560,8 +633,21 @@ class ConnectDistributedTest(Test):
)
@cluster(num_nodes=6)
- @matrix(security_protocol=[SecurityConfig.PLAINTEXT,
SecurityConfig.SASL_SSL], exactly_once_source=[True, False],
connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=quorum.all_non_upgrade)
- def test_file_source_and_sink(self, security_protocol,
exactly_once_source, connect_protocol, metadata_quorum):
+ @matrix(
+ security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL],
+ exactly_once_source=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_file_source_and_sink(self, security_protocol,
exactly_once_source, connect_protocol, metadata_quorum,
use_new_coordinator=False):
"""
Tests that a basic file connector works across clean rolling bounces.
This validates that the connector is
correctly created, tasks instantiated, and as nodes restart the work
is rebalanced across nodes.
@@ -594,8 +680,19 @@ class ConnectDistributedTest(Test):
wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST +
self.SECOND_INPUT_LIST), timeout_sec=150, err_msg="Sink output file never
converged to the same state as the input file")
@cluster(num_nodes=6)
- @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible',
'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_bounce(self, clean, connect_protocol, metadata_quorum):
+ @matrix(
+ clean=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_bounce(self, clean, connect_protocol, metadata_quorum,
use_new_coordinator=False):
"""
Validates that source and sink tasks that run continuously and produce
a predictable sequence of messages
run correctly and deliver messages exactly once when Kafka Connect
workers undergo clean rolling bounces,
@@ -714,8 +811,19 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
@cluster(num_nodes=6)
- @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible',
'eager'], metadata_quorum=quorum.all_non_upgrade)
- def test_exactly_once_source(self, clean, connect_protocol,
metadata_quorum):
+ @matrix(
+ clean=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ clean=[True, False],
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_exactly_once_source(self, clean, connect_protocol,
metadata_quorum, use_new_coordinator=False):
"""
Validates that source tasks run correctly and deliver messages exactly
once
when Kafka Connect workers undergo bounces, both clean and unclean.
@@ -811,8 +919,17 @@ class ConnectDistributedTest(Test):
assert success, "Found validation errors:\n" + "\n ".join(errors)
@cluster(num_nodes=6)
- @matrix(connect_protocol=['sessioned', 'compatible', 'eager'],
metadata_quorum=quorum.all_non_upgrade)
- def test_transformations(self, connect_protocol, metadata_quorum):
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ connect_protocol=['sessioned', 'compatible', 'eager'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_transformations(self, connect_protocol, metadata_quorum,
use_new_coordinator=False):
self.CONNECT_PROTOCOL = connect_protocol
self.setup_services(timestamp_type='CreateTime',
include_filestream_connectors=True)
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py
b/tests/kafkatest/tests/core/consumer_group_command_test.py
index f81eec8de8c..23406770548 100644
--- a/tests/kafkatest/tests/core/consumer_group_command_test.py
+++ b/tests/kafkatest/tests/core/consumer_group_command_test.py
@@ -82,7 +82,7 @@ class ConsumerGroupCommandTest(Test):
if group:
wait_until(lambda:
re.search("topic-consumer-group-command",self.kafka.describe_consumer_group(group=group,
node=kafka_node, command_config=command_config_file)), timeout_sec=10,
- err_msg="Timed out waiting to list expected consumer
groups.")
+ err_msg="Timed out waiting to describe expected
consumer groups.")
else:
wait_until(lambda: "test-consumer-group" in
self.kafka.list_consumer_groups(node=kafka_node,
command_config=command_config_file), timeout_sec=10,
err_msg="Timed out waiting to list expected consumer
groups.")
@@ -90,8 +90,17 @@ class ConsumerGroupCommandTest(Test):
self.consumer.stop()
@cluster(num_nodes=3)
- @matrix(security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=quorum.all_non_upgrade)
- def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk):
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_list_consumer_groups(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Tests if ConsumerGroupCommand is listing correct consumer groups
:return: None
@@ -99,8 +108,17 @@ class ConsumerGroupCommandTest(Test):
self.setup_and_verify(security_protocol)
@cluster(num_nodes=3)
- @matrix(security_protocol=['PLAINTEXT', 'SSL'],
metadata_quorum=quorum.all_non_upgrade)
- def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk):
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ security_protocol=['PLAINTEXT', 'SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_describe_consumer_group(self, security_protocol='PLAINTEXT',
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Tests if ConsumerGroupCommand is describing a consumer group correctly
:return: None
diff --git a/tests/kafkatest/tests/core/fetch_from_follower_test.py
b/tests/kafkatest/tests/core/fetch_from_follower_test.py
index f720de11464..96f3d16cdd1 100644
--- a/tests/kafkatest/tests/core/fetch_from_follower_test.py
+++ b/tests/kafkatest/tests/core/fetch_from_follower_test.py
@@ -70,8 +70,15 @@ class FetchFromFollowerTest(ProduceConsumeValidateTest):
self.kafka.start()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_consumer_preferred_read_replica(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
This test starts up brokers with "broker.rack" and
"replica.selector.class" configurations set. The replica
selector is set to the rack-aware implementation. One of the brokers
has a different rack than the other two.
diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py
b/tests/kafkatest/tests/core/kraft_upgrade_test.py
index 221e43f6baf..f6a29186956 100644
--- a/tests/kafkatest/tests/core/kraft_upgrade_test.py
+++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ducktape.mark import parametrize
+from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer
@@ -109,22 +109,16 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
assert self.kafka.check_protocol_errors(self)
@cluster(num_nodes=5)
- @parametrize(from_kafka_version=str(LATEST_3_1),
metadata_quorum=combined_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_2),
metadata_quorum=combined_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_3),
metadata_quorum=combined_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_4),
metadata_quorum=combined_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_5),
metadata_quorum=combined_kraft)
- @parametrize(from_kafka_version=str(DEV_BRANCH),
metadata_quorum=combined_kraft)
- def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum):
+ @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
+ use_new_coordinator=[True, False],
+ metadata_quorum=[combined_kraft])
+ def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum,
use_new_coordinator=False):
self.run_upgrade(from_kafka_version)
@cluster(num_nodes=8)
- @parametrize(from_kafka_version=str(LATEST_3_1),
metadata_quorum=isolated_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_2),
metadata_quorum=isolated_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_3),
metadata_quorum=isolated_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_4),
metadata_quorum=isolated_kraft)
- @parametrize(from_kafka_version=str(LATEST_3_5),
metadata_quorum=isolated_kraft)
- @parametrize(from_kafka_version=str(DEV_BRANCH),
metadata_quorum=isolated_kraft)
- def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum):
+ @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
+ use_new_coordinator=[True, False],
+ metadata_quorum=[isolated_kraft])
+ def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum,
use_new_coordinator=False):
self.run_upgrade(from_kafka_version)
diff --git a/tests/kafkatest/tests/core/reassign_partitions_test.py
b/tests/kafkatest/tests/core/reassign_partitions_test.py
index 82b11b15289..0e02c3535e7 100644
--- a/tests/kafkatest/tests/core/reassign_partitions_test.py
+++ b/tests/kafkatest/tests/core/reassign_partitions_test.py
@@ -132,10 +132,19 @@ class ReassignPartitionsTest(ProduceConsumeValidateTest):
time.sleep(6)
@cluster(num_nodes=8)
- @matrix(bounce_brokers=[True, False],
- reassign_from_offset_zero=[True, False],
- metadata_quorum=quorum.all_non_upgrade)
- def test_reassign_partitions(self, bounce_brokers,
reassign_from_offset_zero, metadata_quorum):
+ @matrix(
+ bounce_brokers=[True, False],
+ reassign_from_offset_zero=[True, False],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ bounce_brokers=[True, False],
+ reassign_from_offset_zero=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_reassign_partitions(self, bounce_brokers,
reassign_from_offset_zero, metadata_quorum, use_new_coordinator=False):
"""Reassign partitions tests.
Setup: 1 zk, 4 kafka nodes, 1 topic with partitions=20,
replication-factor=3,
and min.insync.replicas=3
diff --git a/tests/kafkatest/tests/core/replica_scale_test.py
b/tests/kafkatest/tests/core/replica_scale_test.py
index f8d0c8379bd..22555d202d3 100644
--- a/tests/kafkatest/tests/core/replica_scale_test.py
+++ b/tests/kafkatest/tests/core/replica_scale_test.py
@@ -48,8 +48,22 @@ class ReplicaScaleTest(Test):
self.zk.stop()
@cluster(num_nodes=12)
- @matrix(topic_count=[50], partition_count=[34], replication_factor=[3],
metadata_quorum=quorum.all_non_upgrade)
- def test_produce_consume(self, topic_count, partition_count,
replication_factor, metadata_quorum=quorum.zk):
+ @matrix(
+ topic_count=[50],
+ partition_count=[34],
+ replication_factor=[3],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ topic_count=[50],
+ partition_count=[34],
+ replication_factor=[3],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_produce_consume(self, topic_count, partition_count,
replication_factor,
+ metadata_quorum=quorum.zk,
use_new_coordinator=False):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "replicas_produce_consume_%d" % i
@@ -103,8 +117,22 @@ class ReplicaScaleTest(Test):
trogdor.stop()
@cluster(num_nodes=12)
- @matrix(topic_count=[50], partition_count=[34], replication_factor=[3],
metadata_quorum=quorum.all_non_upgrade)
- def test_clean_bounce(self, topic_count, partition_count,
replication_factor, metadata_quorum=quorum.zk):
+ @matrix(
+ topic_count=[50],
+ partition_count=[34],
+ replication_factor=[3],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ topic_count=[50],
+ partition_count=[34],
+ replication_factor=[3],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_clean_bounce(self, topic_count, partition_count,
replication_factor,
+ metadata_quorum=quorum.zk,
use_new_coordinator=False):
topics_create_start_time = time.time()
for i in range(topic_count):
topic = "topic-%04d" % i
diff --git a/tests/kafkatest/tests/core/replication_replica_failure_test.py
b/tests/kafkatest/tests/core/replication_replica_failure_test.py
index 42e1c0922f7..66cd1ecac1d 100644
--- a/tests/kafkatest/tests/core/replication_replica_failure_test.py
+++ b/tests/kafkatest/tests/core/replication_replica_failure_test.py
@@ -37,8 +37,15 @@ class ReplicationReplicaFailureTest(EndToEndTest):
super(ReplicationReplicaFailureTest,
self).__init__(test_context=test_context, topic=None)
@cluster(num_nodes=7)
- @matrix(metadata_quorum=quorum.all_non_upgrade)
- def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_replication_with_replica_failure(self, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
This test verifies that replication shrinks the ISR when a replica is
not fetching anymore.
It also verifies that replication provides simple durability
guarantees by checking that data acked by
diff --git a/tests/kafkatest/tests/core/security_test.py
b/tests/kafkatest/tests/core/security_test.py
index cee3a3740ba..8b7c05de8ba 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -58,9 +58,31 @@ class SecurityTest(EndToEndTest):
return True
@cluster(num_nodes=6)
- @matrix(security_protocol=['PLAINTEXT'],
interbroker_security_protocol=['SSL'], metadata_quorum=quorum.all_non_upgrade)
- @matrix(security_protocol=['SSL'],
interbroker_security_protocol=['PLAINTEXT'],
metadata_quorum=quorum.all_non_upgrade)
- def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum=quorum.zk):
+ @matrix(
+ security_protocol=['PLAINTEXT'],
+ interbroker_security_protocol=['SSL'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ security_protocol=['PLAINTEXT'],
+ interbroker_security_protocol=['SSL'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ @matrix(
+ security_protocol=['SSL'],
+ interbroker_security_protocol=['PLAINTEXT'],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ security_protocol=['SSL'],
+ interbroker_security_protocol=['PLAINTEXT'],
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_client_ssl_endpoint_validation_failure(self, security_protocol,
interbroker_security_protocol, metadata_quorum=quorum.zk,
use_new_coordinator=False):
"""
Test that invalid hostname in certificate results in connection
failures.
When security_protocol=SSL, client SSL handshakes are expected to fail
due to hostname verification failure.
@@ -132,8 +154,15 @@ class SecurityTest(EndToEndTest):
self.consumer.start()
@cluster(num_nodes=2)
- @matrix(metadata_quorum=[quorum.zk, quorum.isolated_kraft])
- def test_quorum_ssl_endpoint_validation_failure(self,
metadata_quorum=quorum.zk):
+ @matrix(
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ use_new_coordinator=[True, False]
+ )
+ def test_quorum_ssl_endpoint_validation_failure(self,
metadata_quorum=quorum.zk, use_new_coordinator=False):
"""
Test that invalid hostname in ZooKeeper or KRaft Controller results in
broker inability to start.
"""
diff --git a/tests/kafkatest/tests/core/snapshot_test.py
b/tests/kafkatest/tests/core/snapshot_test.py
index ff0cff1feae..5368b48e5c1 100644
--- a/tests/kafkatest/tests/core/snapshot_test.py
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -144,8 +144,11 @@ class TestSnapshots(ProduceConsumeValidateTest):
self.validate()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=quorum.all_kraft)
- def test_broker(self, metadata_quorum=quorum.combined_kraft):
+ @matrix(
+ metadata_quorum=quorum.all_kraft,
+ use_new_coordinator=[True, False]
+ )
+ def test_broker(self, metadata_quorum=quorum.combined_kraft,
use_new_coordinator=False):
""" Test the ability of a broker to consume metadata snapshots
and to recover the cluster metadata state using them
@@ -204,8 +207,11 @@ class TestSnapshots(ProduceConsumeValidateTest):
self.validate_success(broker_topic)
@cluster(num_nodes=9)
- @matrix(metadata_quorum=quorum.all_kraft)
- def test_controller(self, metadata_quorum=quorum.combined_kraft):
+ @matrix(
+ metadata_quorum=quorum.all_kraft,
+ use_new_coordinator=[True, False]
+ )
+ def test_controller(self, metadata_quorum=quorum.combined_kraft,
use_new_coordinator=False):
""" Test the ability of controllers to consume metadata snapshots
and to recover the cluster metadata state using them
diff --git a/tests/kafkatest/tests/core/transactions_test.py
b/tests/kafkatest/tests/core/transactions_test.py
index 51ee971ef8f..b9b39f355e4 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -243,11 +243,23 @@ class TransactionsTest(Test):
}
@cluster(num_nodes=9)
- @matrix(failure_mode=["hard_bounce", "clean_bounce"],
- bounce_target=["brokers", "clients"],
- check_order=[True, False],
- use_group_metadata=[True, False])
- def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum=quorum.all):
+ @matrix(
+ failure_mode=["hard_bounce", "clean_bounce"],
+ bounce_target=["brokers", "clients"],
+ check_order=[True, False],
+ use_group_metadata=[True, False],
+ metadata_quorum=[quorum.zk],
+ use_new_coordinator=[False]
+ )
+ @matrix(
+ failure_mode=["hard_bounce", "clean_bounce"],
+ bounce_target=["brokers", "clients"],
+ check_order=[True, False],
+ use_group_metadata=[True, False],
+ metadata_quorum=quorum.all_kraft,
+ use_new_coordinator=[True, False]
+ )
+ def test_transactions(self, failure_mode, bounce_target, check_order,
use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False):
security_protocol = 'PLAINTEXT'
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
diff --git
a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index 92e01b87c75..5057c3d2793 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -45,8 +45,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.zk.start()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_streams_resilient_to_broker_down(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def test_streams_resilient_to_broker_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
# Broker should be down over 2x of retries * timeout ms
@@ -82,8 +82,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=7)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_streams_runs_with_broker_down_initially(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def test_streams_runs_with_broker_down_initially(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
node = self.kafka.leader(self.inputTopic)
self.kafka.stop_node(node)
@@ -150,8 +150,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
@@ -229,8 +229,8 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop()
@cluster(num_nodes=9)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_streams_should_failover_while_brokers_down(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def test_streams_should_failover_while_brokers_down(self, metadata_quorum,
use_new_coordinator=False):
self.kafka.start()
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 382bbb7aaf3..78df8d3ebc3 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -47,8 +47,8 @@ class StreamsStandbyTask(BaseStreamsTest):
})
@cluster(num_nodes=10)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_standby_tasks_rebalance(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def test_standby_tasks_rebalance(self, metadata_quorum,
use_new_coordinator=False):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
% (
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 3bed575fa2b..2249f6cc345 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -55,8 +55,8 @@ class StreamsStaticMembershipTest(Test):
acks=1)
@cluster(num_nodes=8)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self,
metadata_quorum):
+ @matrix(metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True, False])
+ def
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self,
metadata_quorum, use_new_coordinator=False):
if self.zookeeper:
self.zookeeper.start()
self.kafka.start()