This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c2c2dd424b21af53110f1a137127dc80481d4281 Author: Alyssa Huang <[email protected]> AuthorDate: Fri Sep 6 13:44:09 2024 -0700 KAFKA-16963: Ducktape test for KIP-853 (#17081) Add a ducktape system test for KIP-853 quorum reconfiguration, including adding and removing voters. Reviewers: Colin P. McCabe <[email protected]> --- .../java/org/apache/kafka/raft/LeaderState.java | 2 +- tests/kafkatest/services/kafka/config_property.py | 4 + tests/kafkatest/services/kafka/kafka.py | 146 +++++++++++++++--- .../services/kafka/templates/kafka.properties | 6 +- .../tests/core/quorum_reconfiguration_test.py | 165 +++++++++++++++++++++ 5 files changed, 301 insertions(+), 22 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 803804858ab..ae9be94e13d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -384,7 +384,7 @@ public class LeaderState<T> implements EpochState { } public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long currentTimeMs) { - // In summary, let's consider a replica caughed up for add voter, if they + // In summary, let's consider a replica caught up for add voter, if they // have fetched within the last hour long anHourInMs = TimeUnit.HOURS.toMillis(1); return Optional.ofNullable(observerStates.get(replicaKey)) diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py index bc4708d0d32..28582513aba 100644 --- a/tests/kafkatest/services/kafka/config_property.py +++ b/tests/kafkatest/services/kafka/config_property.py @@ -19,12 +19,16 @@ Define Kafka configuration property names here. BROKER_ID = "broker.id" NODE_ID = "node.id" +PROCESS_ROLES = "process.roles" FIRST_BROKER_PORT = 9092 FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500 FIRST_CONTROLLER_ID = 3001 CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w" PORT = "port" ADVERTISED_HOSTNAME = "advertised.host.name" +ADVERTISED_LISTENERS = "advertised.listeners" +LISTENERS = "listeners" +CONTROLLER_LISTENER_NAMES = "controller.listener.names" NUM_NETWORK_THREADS = "num.network.threads" NUM_IO_THREADS = "num.io.threads" diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index bd1c9b1e332..547762a5343 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -204,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): controller_num_nodes_override=0, allow_zk_with_kraft=False, quorum_info_provider=None, - use_new_coordinator=None + use_new_coordinator=None, + dynamicRaftQuorum=False ): """ :param context: test context @@ -262,7 +263,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: [["config1", "false"], ["config2", "0"]]} :param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable :param KafkaService isolated_kafka: process.roles=controller for this cluster when not None; ignored when using ZooKeeper - :param int controller_num_nodes_override: the number of nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise + :param int controller_num_nodes_override: the number of controller nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or controller to also use ZooKeeper :param quorum_info_provider: A function that takes this KafkaService as an argument and returns a ServiceQuorumInfo. If this is None, then the ServiceQuorumInfo is generated from the test context :param use_new_coordinator: When true, use the new implementation of the group coordinator as per KIP-848. If this is None, the default existing group coordinator is used. @@ -298,6 +299,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.num_nodes_controller_role = 0 if self.quorum_info.using_kraft: + self.dynamicRaftQuorum = dynamicRaftQuorum + self.first_controller_started = False if self.quorum_info.has_brokers: num_nodes_broker_role = num_nodes if self.quorum_info.has_controllers: @@ -337,7 +340,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): listener_security_config=listener_security_config, extra_kafka_opts=extra_kafka_opts, tls_version=tls_version, isolated_kafka=self, allow_zk_with_kraft=self.allow_zk_with_kraft, - server_prop_overrides=server_prop_overrides + server_prop_overrides=server_prop_overrides, dynamicRaftQuorum=self.dynamicRaftQuorum ) self.controller_quorum = self.isolated_controller_quorum @@ -434,15 +437,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): kraft_broker_plus_zk_configs = kraft_broker_configs.copy() kraft_broker_plus_zk_configs.update(zk_broker_configs) kraft_broker_plus_zk_configs.pop(config_property.BROKER_ID) - controller_only_configs = { - config_property.NODE_ID: self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1, - } - kraft_controller_plus_zk_configs = controller_only_configs.copy() - kraft_controller_plus_zk_configs.update(zk_broker_configs) - kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID) if node_quorum_info.service_quorum_info.using_zk: node.config = KafkaConfig(**zk_broker_configs) elif not node_quorum_info.has_broker_role: # KRaft controller-only role + controller_only_configs = { + config_property.NODE_ID: self.node_id_as_isolated_controller(node), + } + kraft_controller_plus_zk_configs = controller_only_configs.copy() + kraft_controller_plus_zk_configs.update(zk_broker_configs) + kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID) if self.zk: node.config = KafkaConfig(**kraft_controller_plus_zk_configs) else: @@ -455,6 +458,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.combined_nodes_started = 0 self.nodes_to_start = self.nodes + # Does not do any validation to check if this node is part of an isolated controller quorum or not + def node_id_as_isolated_controller(self, node): + return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1 + def reconfigure_zk_for_migration(self, kraft_quorum): self.configured_for_zk_migration = True self.controller_quorum = kraft_quorum @@ -627,7 +634,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def alive(self, node): return len(self.pids(node)) > 0 - def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60, **kwargs): + def start(self, add_principals="", nodes_to_skip=[], isolated_controllers_to_skip=[], timeout_sec=60, **kwargs): """ Start the Kafka broker and wait until it registers its ID in ZooKeeper Startup will be skipped for any nodes in nodes_to_skip. These nodes can be started later via add_broker @@ -665,7 +672,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self._ensure_zk_chroot() if self.isolated_controller_quorum: - self.isolated_controller_quorum.start() + self.isolated_controller_quorum.start(nodes_to_skip=isolated_controllers_to_skip) Service.start(self, **kwargs) @@ -727,7 +734,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.concurrent_start = False self.start_node(node) self.concurrent_start = orig_concurrent_start - wait_until(lambda: self.is_registered(node), 30, 1) def _ensure_zk_chroot(self): self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot) @@ -867,15 +873,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.maybe_setup_broker_scram_credentials(node) if self.quorum_info.using_kraft: - # define controller.quorum.voters text + # define controller.quorum.bootstrap.servrers or controller.quorum.voters text security_protocol_to_use = self.controller_quorum.controller_security_protocol first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID - self.controller_quorum_voters = ','.join(["%s@%s:%s" % - (self.controller_quorum.idx(node) + first_node_id - 1, - node.account.hostname, - config_property.FIRST_CONTROLLER_PORT + - KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use)) - for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) + controller_quorum_bootstrap_servers = ','.join(["%s:%s" % (node.account.hostname, + config_property.FIRST_CONTROLLER_PORT + + KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use)) + for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) + if self.dynamicRaftQuorum: + self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers + else: + self.controller_quorum_voters = ','.join(["%s@%s" % (self.controller_quorum.idx(node) + first_node_id - 1, + bootstrap_server) + for bootstrap_server in controller_quorum_bootstrap_servers]) # define controller.listener.names self.controller_listener_names = ','.join(self.controller_listener_name_list(node)) # define sasl.mechanism.controller.protocol to match the isolated quorum if one exists @@ -892,8 +902,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # format log directories if necessary kafka_storage_script = self.path.script("kafka-storage.sh", node) cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID) + if self.dynamicRaftQuorum: + cmd += " --feature kraft.version=1" + if not self.first_controller_started and self.node_quorum_info.has_controller_role: + cmd += " --standalone" self.logger.info("Running log directory format command...\n%s" % cmd) node.account.ssh(cmd) + self.first_controller_started = True cmd = self.start_cmd(node) self.logger.debug("Attempting to start KafkaService %s on %s with command: %s" %\ @@ -1009,9 +1024,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): JmxMixin.clean_node(self, node) self.security_config.clean_node(node) node.account.kill_process(self.java_class_name(), - clean_shutdown=False, allow_fail=True) + clean_shutdown=False, allow_fail=True) node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False) + def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, use_controller_bootstrap=False): + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker/controller security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT + else: + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + if use_controller_bootstrap: + bootstrap = "--bootstrap-controller %s" % (self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use)) + else: + bootstrap = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", node) + return "%s %s" % (kafka_metadata_script, bootstrap) + def kafka_topics_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-topics against a broker, not a KRaft controller") @@ -1755,6 +1787,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.debug(output) return output + def describe_quorum(self, node=None): + """Run the describe quorum command. + Specifying node is optional, if not specified the command will be run from self.nodes[0] + """ + if node is None: + node = self.nodes[0] + cmd = fix_opts_for_new_jvm(node) + cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % { + 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node) + } + self.logger.info("Running describe quorum command...\n%s" % cmd) + node.account.ssh(cmd) + + output = "" + for line in node.account.ssh_capture(cmd): + output += line + + return output + + def add_controller(self, controllerId, controller): + """Run the metadata quorum add controller command. This should be run on the node that is being added. + """ + command_config_path = os.path.join(KafkaService.PERSISTENT_ROOT, "controller_command_config.properties") + + configs = f""" +{config_property.NODE_ID}={controllerId} +{config_property.PROCESS_ROLES}=controller +{config_property.METADATA_LOG_DIR}={KafkaService.METADATA_LOG_DIR} +{config_property.ADVERTISED_LISTENERS}={self.advertised_listeners} +{config_property.LISTENERS}={self.listeners} +{config_property.CONTROLLER_LISTENER_NAMES}={self.controller_listener_names}""" + + controller.account.create_file(command_config_path, configs) + cmd = fix_opts_for_new_jvm(controller) + cmd += "%(kafka_metadata_quorum_cmd)s --command-config %(command_config)s add-controller" % { + 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True), + 'command_config': command_config_path + } + self.logger.info("Running add controller command...\n%s" % cmd) + controller.account.ssh(cmd) + + def remove_controller(self, controllerId, directoryId, node=None): + """Run the admin tool remove controller command. + Specifying node is optional, if not specified the command will be run from self.nodes[0] + """ + if node is None: + node = self.nodes[0] + cmd = fix_opts_for_new_jvm(node) + cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i %(controller_id)s -d %(directory_id)s" % { + 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node, use_controller_bootstrap=True), + 'controller_id': controllerId, + 'directory_id': directoryId + } + self.logger.info("Running remove controller command...\n%s" % cmd) + node.account.ssh(cmd) + def zk_connect_setting(self): if self.quorum_info.using_kraft and not self.zk: raise Exception("No zookeeper connect string available with KRaft unless ZooKeeper is explicitly enabled") @@ -1769,6 +1857,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): for node in self.nodes if node not in offline_nodes]) + def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]): + if validate and not port.open: + raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % + str(port.port_number)) + + return ','.join([node.account.hostname + ":" + str(port.port_number) + for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role] + if node not in offline_nodes]) + def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]): """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... @@ -1778,6 +1875,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number)) return self.__bootstrap_servers(port_mapping, validate, offline_nodes) + def bootstrap_controllers(self, protocol='CONTROLLER_PLAINTEXT', validate=True, offline_nodes=[]): + """Return comma-delimited list of controllers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... + + This is the format expected by many config files. + """ + port_mapping = self.port_mappings[protocol] + self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number)) + return self.__bootstrap_controllers(port_mapping, validate, offline_nodes) + def controller(self): """ Get the controller node """ diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 65bf389b029..21b60afeb83 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -23,8 +23,12 @@ process.roles=controller {% else %} process.roles=broker {% endif %} -# The connect string for the controller quorum +# The connect string for the controller quorum. Only one should be defined +{% if controller_quorum_bootstrap_servers %} +controller.quorum.bootstrap.servers={{ controller_quorum_bootstrap_servers }} +{% else %} controller.quorum.voters={{ controller_quorum_voters }} +{% endif %} controller.listener.names={{ controller_listener_names }} diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py new file mode 100644 index 00000000000..432b25c1686 --- /dev/null +++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re + +from functools import partial + +from ducktape.mark import matrix +from ducktape.mark.resource import cluster + +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.quorum import combined_kraft, ServiceQuorumInfo, isolated_kraft +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest +from kafkatest.utils import is_int +from kafkatest.version import DEV_BRANCH + +# +# Test quorum reconfiguration for combined and isolated mode +# +class TestQuorumReconfiguration(ProduceConsumeValidateTest): + def __init__(self, test_context): + super(TestQuorumReconfiguration, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.partitions = 3 + self.replication_factor = 3 + + # Producer and consumer + self.producer_throughput = 1000 + self.num_producers = 1 + self.num_consumers = 1 + + def perform_reconfig(self, active_controller_id, inactive_controller_id, inactive_controller, broker_ids): + # Check describe quorum output shows the controller (first node) is the leader and the only voter + output = self.kafka.describe_quorum() + assert re.search(r"LeaderId:\s*" + str(active_controller_id), output) + assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id) + assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids) + + # Start second controller + self.kafka.controller_quorum.add_broker(inactive_controller) + output = self.kafka.describe_quorum() + assert re.search(r"LeaderId:\s*" + str(active_controller_id), output) + assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id) + assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids + [inactive_controller_id]) + + # Add controller to quorum + self.kafka.controller_quorum.add_controller(inactive_controller_id, inactive_controller) + + # Check describe quorum output shows both controllers are voters + output = self.kafka.describe_quorum() + assert re.search(r"LeaderId:\s*" + str(active_controller_id), output) + assert_nodes_in_output(r"CurrentVoters:.*", output, active_controller_id, inactive_controller_id) + assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids) + + # Remove leader from quorum + voters = json_from_line(r"CurrentVoters:.*", output) + directory_id = next(voter["directoryId"] for voter in voters if voter["id"] == active_controller_id) + self.kafka.controller_quorum.remove_controller(active_controller_id, directory_id) + + # Check describe quorum output to show second_controller is now the leader + output = self.kafka.describe_quorum() + assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output) + assert_nodes_in_output(r"CurrentVoters:.*", output, inactive_controller_id) + assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids) + + + @cluster(num_nodes=6) + @matrix(metadata_quorum=[combined_kraft]) + def test_combined_mode_reconfig(self, metadata_quorum): + self.kafka = KafkaService(self.test_context, + num_nodes=4, + zk=None, + topics={self.topic: {"partitions": self.partitions, + "replication-factor": self.replication_factor, + 'configs': {"min.insync.replicas": 1}}}, + version=DEV_BRANCH, + controller_num_nodes_override=2, + dynamicRaftQuorum=True) + # Start one out of two controllers (standalone mode) + inactive_controller = self.kafka.nodes[1] + self.kafka.start(nodes_to_skip=[inactive_controller]) + + # Start producer and consumer + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, compression_types=["none"], + version=DEV_BRANCH, offline_nodes=[inactive_controller]) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, new_consumer=True, consumer_timeout_ms=30000, + message_validator=is_int, version=DEV_BRANCH) + # Perform reconfigurations + self.run_produce_consume_validate( + core_test_action=lambda: self.perform_reconfig(self.kafka.idx(self.kafka.nodes[0]), + self.kafka.idx(inactive_controller), + inactive_controller, + [self.kafka.idx(node) for node in self.kafka.nodes[2:]])) + + @cluster(num_nodes=7) + @matrix(metadata_quorum=[isolated_kraft]) + def test_isolated_mode_reconfig(self, metadata_quorum): + # Start up KRaft controller in migration mode + remote_quorum = partial(ServiceQuorumInfo, isolated_kraft) + self.kafka = KafkaService(self.test_context, + num_nodes=3, + zk=None, + topics={self.topic: {"partitions": self.partitions, + "replication-factor": self.replication_factor, + 'configs': {"min.insync.replicas": 1}}}, + version=DEV_BRANCH, + controller_num_nodes_override=2, + quorum_info_provider=remote_quorum, + dynamicRaftQuorum=True) + # Start one out of two controllers (standalone mode) + controller_quorum = self.kafka.controller_quorum + inactive_controller = controller_quorum.nodes[1] + self.kafka.start(isolated_controllers_to_skip=[inactive_controller]) + + # Start producer and consumer + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, + self.topic, throughput=self.producer_throughput, + message_validator=is_int, compression_types=["none"], + version=DEV_BRANCH) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, + self.topic, new_consumer=True, consumer_timeout_ms=30000, + message_validator=is_int, version=DEV_BRANCH) + # Perform reconfigurations + self.run_produce_consume_validate( + core_test_action=lambda: self.perform_reconfig(controller_quorum.node_id_as_isolated_controller(self.kafka.controller_quorum.nodes[0]), + controller_quorum.node_id_as_isolated_controller(inactive_controller), + inactive_controller, + [self.kafka.idx(node) for node in self.kafka.nodes])) + +def assert_nodes_in_output(pattern, output, *node_ids): + nodes = json_from_line(pattern, output) + assert len(nodes) == len(node_ids) + + for node in nodes: + assert node["id"] in node_ids + +def json_from_line(pattern, output): + match = re.search(pattern, output) + if not match: + raise Exception("Expected match for pattern %s in describe quorum output" % pattern) + line = match.group(0) + start_index = line.find('[') + end_index = line.rfind(']') + 1 + + return json.loads(line[start_index:end_index])
