This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a9a4a52c9d8 KAFKA-16963: Ducktape test for KIP-853 (#17081)
a9a4a52c9d8 is described below
commit a9a4a52c9d832e616b3e7608a38dde2ff4f43a9a
Author: Alyssa Huang <[email protected]>
AuthorDate: Fri Sep 6 13:44:09 2024 -0700
KAFKA-16963: Ducktape test for KIP-853 (#17081)
Add a ducktape system test for KIP-853 quorum reconfiguration, including
adding and removing voters.
Reviewers: Colin P. McCabe <[email protected]>
---
.../java/org/apache/kafka/raft/LeaderState.java | 2 +-
tests/kafkatest/services/kafka/config_property.py | 4 +
tests/kafkatest/services/kafka/kafka.py | 146 +++++++++++++++---
.../services/kafka/templates/kafka.properties | 6 +-
.../tests/core/quorum_reconfiguration_test.py | 165 +++++++++++++++++++++
5 files changed, 301 insertions(+), 22 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 803804858ab..ae9be94e13d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -384,7 +384,7 @@ public class LeaderState<T> implements EpochState {
}
public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long
currentTimeMs) {
- // In summary, let's consider a replica caughed up for add voter, if
they
+ // In summary, let's consider a replica caught up for add voter, if
they
// have fetched within the last hour
long anHourInMs = TimeUnit.HOURS.toMillis(1);
return Optional.ofNullable(observerStates.get(replicaKey))
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index c8ca2229d98..6e0ee249099 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -19,12 +19,16 @@ Define Kafka configuration property names here.
BROKER_ID = "broker.id"
NODE_ID = "node.id"
+PROCESS_ROLES = "process.roles"
FIRST_BROKER_PORT = 9092
FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
FIRST_CONTROLLER_ID = 3001
CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"
+ADVERTISED_LISTENERS = "advertised.listeners"
+LISTENERS = "listeners"
+CONTROLLER_LISTENER_NAMES = "controller.listener.names"
NUM_NETWORK_THREADS = "num.network.threads"
NUM_IO_THREADS = "num.io.threads"
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index b79513d27ba..1a980e48b32 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -204,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
controller_num_nodes_override=0,
allow_zk_with_kraft=False,
quorum_info_provider=None,
- use_new_coordinator=None
+ use_new_coordinator=None,
+ dynamicRaftQuorum=False
):
"""
:param context: test context
@@ -262,7 +263,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
e.g: {1: [["config1", "true"], ["config2", "1000"]], 2:
[["config1", "false"], ["config2", "0"]]}
:param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
:param KafkaService isolated_kafka: process.roles=controller for this
cluster when not None; ignored when using ZooKeeper
- :param int controller_num_nodes_override: the number of nodes to use
in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not
using ZooKeeper, and isolated_kafka is not None; ignored otherwise
+ :param int controller_num_nodes_override: the number of controller
nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if
positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or
controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService
as an argument and returns a ServiceQuorumInfo. If this is None, then the
ServiceQuorumInfo is generated from the test context
:param use_new_coordinator: When true, use the new implementation of
the group coordinator as per KIP-848. If this is None, the default existing
group coordinator is used.
@@ -299,6 +300,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.num_nodes_controller_role = 0
if self.quorum_info.using_kraft:
+ self.dynamicRaftQuorum = dynamicRaftQuorum
+ self.first_controller_started = False
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
@@ -338,7 +341,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
listener_security_config=listener_security_config,
extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
isolated_kafka=self,
allow_zk_with_kraft=self.allow_zk_with_kraft,
- server_prop_overrides=server_prop_overrides
+ server_prop_overrides=server_prop_overrides,
dynamicRaftQuorum=self.dynamicRaftQuorum
)
self.controller_quorum = self.isolated_controller_quorum
@@ -435,15 +438,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
kraft_broker_plus_zk_configs.pop(config_property.BROKER_ID)
- controller_only_configs = {
- config_property.NODE_ID: self.idx(node) +
config_property.FIRST_CONTROLLER_ID - 1,
- }
- kraft_controller_plus_zk_configs = controller_only_configs.copy()
- kraft_controller_plus_zk_configs.update(zk_broker_configs)
- kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
if node_quorum_info.service_quorum_info.using_zk:
node.config = KafkaConfig(**zk_broker_configs)
elif not node_quorum_info.has_broker_role: # KRaft controller-only
role
+ controller_only_configs = {
+ config_property.NODE_ID:
self.node_id_as_isolated_controller(node),
+ }
+ kraft_controller_plus_zk_configs =
controller_only_configs.copy()
+ kraft_controller_plus_zk_configs.update(zk_broker_configs)
+ kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
if self.zk:
node.config =
KafkaConfig(**kraft_controller_plus_zk_configs)
else:
@@ -456,6 +459,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.combined_nodes_started = 0
self.nodes_to_start = self.nodes
+ # Does not do any validation to check if this node is part of an isolated
controller quorum or not
+ def node_id_as_isolated_controller(self, node):
+ return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
+
def reconfigure_zk_for_migration(self, kraft_quorum):
self.configured_for_zk_migration = True
self.controller_quorum = kraft_quorum
@@ -628,7 +635,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def alive(self, node):
return len(self.pids(node)) > 0
- def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60,
**kwargs):
+ def start(self, add_principals="", nodes_to_skip=[],
isolated_controllers_to_skip=[], timeout_sec=60, **kwargs):
"""
Start the Kafka broker and wait until it registers its ID in ZooKeeper
Startup will be skipped for any nodes in nodes_to_skip. These nodes
can be started later via add_broker
@@ -666,7 +673,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self._ensure_zk_chroot()
if self.isolated_controller_quorum:
- self.isolated_controller_quorum.start()
+
self.isolated_controller_quorum.start(nodes_to_skip=isolated_controllers_to_skip)
Service.start(self, **kwargs)
@@ -728,7 +735,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.concurrent_start = False
self.start_node(node)
self.concurrent_start = orig_concurrent_start
- wait_until(lambda: self.is_registered(node), 30, 1)
def _ensure_zk_chroot(self):
self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
@@ -868,15 +874,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_kraft:
- # define controller.quorum.voters text
+ # define controller.quorum.bootstrap.servrers or
controller.quorum.voters text
security_protocol_to_use =
self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers
else config_property.FIRST_CONTROLLER_ID
- self.controller_quorum_voters = ','.join(["%s@%s:%s" %
-
(self.controller_quorum.idx(node) + first_node_id - 1,
- node.account.hostname,
-
config_property.FIRST_CONTROLLER_PORT +
-
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
- for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+ controller_quorum_bootstrap_servers = ','.join(["%s:%s" %
(node.account.hostname,
+
config_property.FIRST_CONTROLLER_PORT +
+
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+ for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+ if self.dynamicRaftQuorum:
+ self.controller_quorum_bootstrap_servers =
controller_quorum_bootstrap_servers
+ else:
+ self.controller_quorum_voters = ','.join(["%s@%s" %
(self.controller_quorum.idx(node) + first_node_id - 1,
+
bootstrap_server)
+ for bootstrap_server
in controller_quorum_bootstrap_servers])
# define controller.listener.names
self.controller_listener_names =
','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match the isolated
quorum if one exists
@@ -893,8 +903,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" %
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
+ if self.dynamicRaftQuorum:
+ cmd += " --feature kraft.version=1"
+ if not self.first_controller_started and
self.node_quorum_info.has_controller_role:
+ cmd += " --standalone"
self.logger.info("Running log directory format command...\n%s" %
cmd)
node.account.ssh(cmd)
+ self.first_controller_started = True
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService %s on %s with
command: %s" %\
@@ -1010,9 +1025,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
JmxMixin.clean_node(self, node)
self.security_config.clean_node(node)
node.account.kill_process(self.java_class_name(),
- clean_shutdown=False, allow_fail=True)
+ clean_shutdown=False, allow_fail=True)
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT,
allow_fail=False)
+ def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None,
use_controller_bootstrap=False):
+ if kafka_security_protocol is None:
+ # it wasn't specified, so use the inter-broker/controller security
protocol if it is PLAINTEXT,
+ # otherwise use the client security protocol
+ if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+ security_protocol_to_use = SecurityConfig.PLAINTEXT
+ else:
+ security_protocol_to_use = self.security_protocol
+ else:
+ security_protocol_to_use = kafka_security_protocol
+ if use_controller_bootstrap:
+ bootstrap = "--bootstrap-controller %s" %
(self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
+ else:
+ bootstrap = "--bootstrap-server %s" %
(self.bootstrap_servers(security_protocol_to_use))
+ kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh",
node)
+ return "%s %s" % (kafka_metadata_script, bootstrap)
+
def kafka_topics_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-topics against a broker, not a
KRaft controller")
@@ -1760,6 +1792,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.logger.debug(output)
return output
+ def describe_quorum(self, node=None):
+ """Run the describe quorum command.
+ Specifying node is optional, if not specified the command will be run
from self.nodes[0]
+ """
+ if node is None:
+ node = self.nodes[0]
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
+ 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
+ }
+ self.logger.info("Running describe quorum command...\n%s" % cmd)
+ node.account.ssh(cmd)
+
+ output = ""
+ for line in node.account.ssh_capture(cmd):
+ output += line
+
+ return output
+
+ def add_controller(self, controllerId, controller):
+ """Run the metadata quorum add controller command. This should be run
on the node that is being added.
+ """
+ command_config_path = os.path.join(KafkaService.PERSISTENT_ROOT,
"controller_command_config.properties")
+
+ configs = f"""
+{config_property.NODE_ID}={controllerId}
+{config_property.PROCESS_ROLES}=controller
+{config_property.METADATA_LOG_DIR}={KafkaService.METADATA_LOG_DIR}
+{config_property.ADVERTISED_LISTENERS}={self.advertised_listeners}
+{config_property.LISTENERS}={self.listeners}
+{config_property.CONTROLLER_LISTENER_NAMES}={self.controller_listener_names}"""
+
+ controller.account.create_file(command_config_path, configs)
+ cmd = fix_opts_for_new_jvm(controller)
+ cmd += "%(kafka_metadata_quorum_cmd)s --command-config
%(command_config)s add-controller" % {
+ 'kafka_metadata_quorum_cmd':
self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
+ 'command_config': command_config_path
+ }
+ self.logger.info("Running add controller command...\n%s" % cmd)
+ controller.account.ssh(cmd)
+
+ def remove_controller(self, controllerId, directoryId, node=None):
+ """Run the admin tool remove controller command.
+ Specifying node is optional, if not specified the command will be run
from self.nodes[0]
+ """
+ if node is None:
+ node = self.nodes[0]
+ cmd = fix_opts_for_new_jvm(node)
+ cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i
%(controller_id)s -d %(directory_id)s" % {
+ 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node,
use_controller_bootstrap=True),
+ 'controller_id': controllerId,
+ 'directory_id': directoryId
+ }
+ self.logger.info("Running remove controller command...\n%s" % cmd)
+ node.account.ssh(cmd)
+
def zk_connect_setting(self):
if self.quorum_info.using_kraft and not self.zk:
raise Exception("No zookeeper connect string available with KRaft
unless ZooKeeper is explicitly enabled")
@@ -1774,6 +1862,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
for node in self.nodes
if node not in offline_nodes])
+ def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
+ if validate and not port.open:
+ raise ValueError("We are retrieving bootstrap servers for the
port: %s which is not currently open. - " %
+ str(port.port_number))
+
+ return ','.join([node.account.hostname + ":" + str(port.port_number)
+ for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
+ if node not in offline_nodes])
+
def bootstrap_servers(self, protocol='PLAINTEXT', validate=True,
offline_nodes=[]):
"""Return comma-delimited list of brokers in this cluster formatted as
HOSTNAME1:PORT1,HOSTNAME:PORT2,...
@@ -1783,6 +1880,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.logger.info("Bootstrap client port is: " +
str(port_mapping.port_number))
return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
+ def bootstrap_controllers(self, protocol='CONTROLLER_PLAINTEXT',
validate=True, offline_nodes=[]):
+ """Return comma-delimited list of controllers in this cluster
formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
+
+ This is the format expected by many config files.
+ """
+ port_mapping = self.port_mappings[protocol]
+ self.logger.info("Bootstrap client port is: " +
str(port_mapping.port_number))
+ return self.__bootstrap_controllers(port_mapping, validate,
offline_nodes)
+
def controller(self):
""" Get the controller node
"""
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 65bf389b029..21b60afeb83 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -23,8 +23,12 @@ process.roles=controller
{% else %}
process.roles=broker
{% endif %}
-# The connect string for the controller quorum
+# The connect string for the controller quorum. Only one should be defined
+{% if controller_quorum_bootstrap_servers %}
+controller.quorum.bootstrap.servers={{ controller_quorum_bootstrap_servers }}
+{% else %}
controller.quorum.voters={{ controller_quorum_voters }}
+{% endif %}
controller.listener.names={{ controller_listener_names }}
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
new file mode 100644
index 00000000000..432b25c1686
--- /dev/null
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import re
+
+from functools import partial
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.quorum import combined_kraft, ServiceQuorumInfo,
isolated_kraft
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import DEV_BRANCH
+
+#
+# Test quorum reconfiguration for combined and isolated mode
+#
+class TestQuorumReconfiguration(ProduceConsumeValidateTest):
+ def __init__(self, test_context):
+ super(TestQuorumReconfiguration,
self).__init__(test_context=test_context)
+
+ def setUp(self):
+ self.topic = "test_topic"
+ self.partitions = 3
+ self.replication_factor = 3
+
+ # Producer and consumer
+ self.producer_throughput = 1000
+ self.num_producers = 1
+ self.num_consumers = 1
+
+ def perform_reconfig(self, active_controller_id, inactive_controller_id,
inactive_controller, broker_ids):
+ # Check describe quorum output shows the controller (first node) is
the leader and the only voter
+ output = self.kafka.describe_quorum()
+ assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+ assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
+ assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+ # Start second controller
+ self.kafka.controller_quorum.add_broker(inactive_controller)
+ output = self.kafka.describe_quorum()
+ assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+ assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
+ assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids +
[inactive_controller_id])
+
+ # Add controller to quorum
+ self.kafka.controller_quorum.add_controller(inactive_controller_id,
inactive_controller)
+
+ # Check describe quorum output shows both controllers are voters
+ output = self.kafka.describe_quorum()
+ assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+ assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id, inactive_controller_id)
+ assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+ # Remove leader from quorum
+ voters = json_from_line(r"CurrentVoters:.*", output)
+ directory_id = next(voter["directoryId"] for voter in voters if
voter["id"] == active_controller_id)
+ self.kafka.controller_quorum.remove_controller(active_controller_id,
directory_id)
+
+ # Check describe quorum output to show second_controller is now the
leader
+ output = self.kafka.describe_quorum()
+ assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
+ assert_nodes_in_output(r"CurrentVoters:.*", output,
inactive_controller_id)
+ assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+
+ @cluster(num_nodes=6)
+ @matrix(metadata_quorum=[combined_kraft])
+ def test_combined_mode_reconfig(self, metadata_quorum):
+ self.kafka = KafkaService(self.test_context,
+ num_nodes=4,
+ zk=None,
+ topics={self.topic: {"partitions":
self.partitions,
+ "replication-factor":
self.replication_factor,
+ 'configs':
{"min.insync.replicas": 1}}},
+ version=DEV_BRANCH,
+ controller_num_nodes_override=2,
+ dynamicRaftQuorum=True)
+ # Start one out of two controllers (standalone mode)
+ inactive_controller = self.kafka.nodes[1]
+ self.kafka.start(nodes_to_skip=[inactive_controller])
+
+ # Start producer and consumer
+ self.producer = VerifiableProducer(self.test_context,
self.num_producers, self.kafka,
+ self.topic,
throughput=self.producer_throughput,
+ message_validator=is_int,
compression_types=["none"],
+ version=DEV_BRANCH,
offline_nodes=[inactive_controller])
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
+ self.topic, new_consumer=True,
consumer_timeout_ms=30000,
+ message_validator=is_int,
version=DEV_BRANCH)
+ # Perform reconfigurations
+ self.run_produce_consume_validate(
+ core_test_action=lambda:
self.perform_reconfig(self.kafka.idx(self.kafka.nodes[0]),
+
self.kafka.idx(inactive_controller),
+ inactive_controller,
+
[self.kafka.idx(node) for node in self.kafka.nodes[2:]]))
+
+ @cluster(num_nodes=7)
+ @matrix(metadata_quorum=[isolated_kraft])
+ def test_isolated_mode_reconfig(self, metadata_quorum):
+ # Start up KRaft controller in migration mode
+ remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
+ self.kafka = KafkaService(self.test_context,
+ num_nodes=3,
+ zk=None,
+ topics={self.topic: {"partitions":
self.partitions,
+ "replication-factor":
self.replication_factor,
+ 'configs':
{"min.insync.replicas": 1}}},
+ version=DEV_BRANCH,
+ controller_num_nodes_override=2,
+ quorum_info_provider=remote_quorum,
+ dynamicRaftQuorum=True)
+ # Start one out of two controllers (standalone mode)
+ controller_quorum = self.kafka.controller_quorum
+ inactive_controller = controller_quorum.nodes[1]
+ self.kafka.start(isolated_controllers_to_skip=[inactive_controller])
+
+ # Start producer and consumer
+ self.producer = VerifiableProducer(self.test_context,
self.num_producers, self.kafka,
+ self.topic,
throughput=self.producer_throughput,
+ message_validator=is_int,
compression_types=["none"],
+ version=DEV_BRANCH)
+ self.consumer = ConsoleConsumer(self.test_context, self.num_consumers,
self.kafka,
+ self.topic, new_consumer=True,
consumer_timeout_ms=30000,
+ message_validator=is_int,
version=DEV_BRANCH)
+ # Perform reconfigurations
+ self.run_produce_consume_validate(
+ core_test_action=lambda:
self.perform_reconfig(controller_quorum.node_id_as_isolated_controller(self.kafka.controller_quorum.nodes[0]),
+
controller_quorum.node_id_as_isolated_controller(inactive_controller),
+ inactive_controller,
+
[self.kafka.idx(node) for node in self.kafka.nodes]))
+
+def assert_nodes_in_output(pattern, output, *node_ids):
+ nodes = json_from_line(pattern, output)
+ assert len(nodes) == len(node_ids)
+
+ for node in nodes:
+ assert node["id"] in node_ids
+
+def json_from_line(pattern, output):
+ match = re.search(pattern, output)
+ if not match:
+ raise Exception("Expected match for pattern %s in describe quorum
output" % pattern)
+ line = match.group(0)
+ start_index = line.find('[')
+ end_index = line.rfind(']') + 1
+
+ return json.loads(line[start_index:end_index])