This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 68b97705062 KAFKA-17608, KAFKA-17604, KAFKA-16963; KRaft controller
crashes when active controller is removed (#17146)
68b97705062 is described below
commit 68b97705062826f0cef935f4769b1af17f460919
Author: Alyssa Huang <[email protected]>
AuthorDate: Thu Sep 26 10:56:19 2024 -0700
KAFKA-17608, KAFKA-17604, KAFKA-16963; KRaft controller crashes when active
controller is removed (#17146)
This change fixes a few issues.
KAFKA-17608; KRaft controller crashes when active controller is removed
When a control batch is committed, the quorum controller currently
increases the last stable offset but fails to create a snapshot for that
offset. This causes an issue if the quorum controller renounces and needs to
revert to that offset (which has no snapshot present). Since the control
batches are no-ops for the quorum controller, it does not need to update its
offsets for control records. We skip handle commit logic for control batches.
KAFKA-17604; Describe quorum output missing added voters endpoints
Describe quorum output will miss endpoints of voters which were added via
AddRaftVoter. This is due to a bug in LeaderState's
updateVoterAndObserverStates which will pull replica state from observer states
map (which does not include endpoints). The fix is to populate endpoints from
the lastVoterSet passed into the method.
Reviewers: José Armando García Sancio <[email protected]>, Colin P. McCabe
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/controller/OffsetControlManager.java | 4 +
.../apache/kafka/controller/QuorumController.java | 8 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 2 +-
.../java/org/apache/kafka/raft/LeaderState.java | 9 +-
tests/kafkatest/services/kafka/kafka.py | 63 ++++++------
.../tests/core/quorum_reconfiguration_test.py | 110 ++++++++++++++-------
6 files changed, 123 insertions(+), 73 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index eeffe0a1c43..c52f059d7ac 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -280,6 +280,10 @@ class OffsetControlManager {
this.lastCommittedOffset = batch.lastOffset();
this.lastCommittedEpoch = batch.epoch();
maybeAdvanceLastStableOffset();
+ handleCommitBatchMetrics(batch);
+ }
+
+ void handleCommitBatchMetrics(Batch<ApiMessageAndVersion> batch) {
metrics.setLastCommittedRecordOffset(batch.lastOffset());
if (!active()) {
// On standby controllers, the last applied record offset is
equals to the last
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 74451716277..1fe995dddd6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1065,7 +1065,10 @@ public final class QuorumController implements
Controller {
int epoch = batch.epoch();
List<ApiMessageAndVersion> messages = batch.records();
- if (isActive) {
+ if (messages.isEmpty()) {
+ log.debug("Skipping handling commit for batch with
no data records with offset {} and epoch {}.", offset, epoch);
+ offsetControl.handleCommitBatchMetrics(batch);
+ } else if (isActive) {
// If the controller is active, the records were
already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset
{} and epoch {}.", offset, epoch);
@@ -1075,9 +1078,6 @@ public final class QuorumController implements Controller
{
offsetControl.handleCommitBatch(batch);
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
-
- // The active controller can delete up to the
current committed offset.
-
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());
} else {
// If the controller is a standby, replay the
records that were
// created by the active controller.
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index d75417a77a1..4a708cb8716 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2911,7 +2911,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
if (quorum.isVoter()) {
transitionToCandidate(currentTimeMs);
} else {
- // It is posible that the old leader is not a voter in the new
voter set.
+ // It is possible that the old leader is not a voter in the
new voter set.
// In that case increase the epoch and transition to
unattached. The epoch needs
// to be increased to avoid FETCH responses with the leader
being this replica.
transitionToUnattached(quorum.epoch() + 1);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index ae9be94e13d..c09282c87c9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -678,6 +678,9 @@ public class LeaderState<T> implements EpochState {
// Make sure that the replica key in the replica state matches the
voter's
state.setReplicaKey(voterNode.voterKey());
+
+ // Make sure that the listeners are updated
+ state.updateListeners(voterNode.listeners());
newVoterStates.put(state.replicaKey.id(), state);
}
voterStates = newVoterStates;
@@ -752,8 +755,12 @@ public class LeaderState<T> implements EpochState {
this.replicaKey = replicaKey;
}
+ void updateListeners(Endpoints listeners) {
+ this.listeners = listeners;
+ }
+
void clearListeners() {
- this.listeners = Endpoints.empty();
+ updateListeners(Endpoints.empty());
}
boolean matchesKey(ReplicaKey replicaKey) {
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index b891da42344..c672e7a2820 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -267,6 +267,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
:param bool allow_zk_with_kraft: if True, then allow a KRaft broker or
controller to also use ZooKeeper
:param quorum_info_provider: A function that takes this KafkaService
as an argument and returns a ServiceQuorumInfo. If this is None, then the
ServiceQuorumInfo is generated from the test context
:param use_new_coordinator: When true, use the new implementation of
the group coordinator as per KIP-848. If this is None, the default existing
group coordinator is used.
+ :param dynamicRaftQuorum: When true, the quorum uses kraft.version=1,
controller_quorum_bootstrap_servers, and bootstraps the first controller using
the standalone flag
"""
self.zk = zk
@@ -299,7 +300,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
if self.quorum_info.using_kraft:
self.dynamicRaftQuorum = dynamicRaftQuorum
- self.first_controller_started = False
+ # Used to ensure not more than one controller bootstraps with the
standalone flag
+ self.standalone_controller_bootstrapped = False
if self.quorum_info.has_brokers:
num_nodes_broker_role = num_nodes
if self.quorum_info.has_controllers:
@@ -449,8 +451,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.combined_nodes_started = 0
self.nodes_to_start = self.nodes
- # Does not do any validation to check if this node is part of an isolated
controller quorum or not
def node_id_as_isolated_controller(self, node):
+ """
+ Generates the node id for a controller-only node, starting from
config_property.FIRST_CONTROLLER_ID so as not
+ to overlap with broker id numbering.
+ This method does not do any validation to check this node is actually
part of an isolated controller quorum.
+ """
return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
def reconfigure_zk_for_migration(self, kraft_quorum):
@@ -746,8 +752,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
- if not port.name in controller_listener_names:
- advertised_listeners.append(port.advertised_listener(node))
+ advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol =
self.isolated_controller_quorum.controller_security_protocol if
self.isolated_controller_quorum \
else self.controller_security_protocol if
self.quorum_info.has_brokers_and_controllers and not
quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
@@ -863,12 +868,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.maybe_setup_broker_scram_credentials(node)
if self.quorum_info.using_kraft:
- # define controller.quorum.bootstrap.servrers or
controller.quorum.voters text
+ # define controller.quorum.bootstrap.servers or
controller.quorum.voters text
security_protocol_to_use =
self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers
else config_property.FIRST_CONTROLLER_ID
- controller_quorum_bootstrap_servers = ','.join(["%s:%s" %
(node.account.hostname,
-
config_property.FIRST_CONTROLLER_PORT +
-
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+ controller_quorum_bootstrap_servers =
','.join(["{}:{}".format(node.account.hostname,
+
config_property.FIRST_CONTROLLER_PORT +
+
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers =
controller_quorum_bootstrap_servers
@@ -894,11 +899,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" %
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.dynamicRaftQuorum:
cmd += " --feature kraft.version=1"
- if not self.first_controller_started and
self.node_quorum_info.has_controller_role:
+ if not self.standalone_controller_bootstrapped and
self.node_quorum_info.has_controller_role:
cmd += " --standalone"
+ self.standalone_controller_bootstrapped = True
self.logger.info("Running log directory format command...\n%s" %
cmd)
node.account.ssh(cmd)
- self.first_controller_started = True
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService %s on %s with
command: %s" %\
@@ -1028,11 +1033,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
else:
security_protocol_to_use = kafka_security_protocol
if use_controller_bootstrap:
- bootstrap = "--bootstrap-controller %s" %
(self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
+ bootstrap = "--bootstrap-controller {}".format(
+
self.bootstrap_controllers("CONTROLLER_{}".format(security_protocol_to_use)))
else:
- bootstrap = "--bootstrap-server %s" %
(self.bootstrap_servers(security_protocol_to_use))
+ bootstrap = "--bootstrap-server
{}".format(self.bootstrap_servers(security_protocol_to_use))
kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh",
node)
- return "%s %s" % (kafka_metadata_script, bootstrap)
+ return "{} {}".format(kafka_metadata_script, bootstrap)
def kafka_topics_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
@@ -1788,10 +1794,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
- cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
- 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
- }
- self.logger.info("Running describe quorum command...\n%s" % cmd)
+ cmd += f"{self.kafka_metadata_quorum_cmd(node)} describe --status"
+ self.logger.info(f"Running describe quorum command...\n{cmd}")
node.account.ssh(cmd)
output = ""
@@ -1815,11 +1819,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
controller.account.create_file(command_config_path, configs)
cmd = fix_opts_for_new_jvm(controller)
- cmd += "%(kafka_metadata_quorum_cmd)s --command-config
%(command_config)s add-controller" % {
- 'kafka_metadata_quorum_cmd':
self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
- 'command_config': command_config_path
- }
- self.logger.info("Running add controller command...\n%s" % cmd)
+ kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(controller,
use_controller_bootstrap=True)
+ cmd += f"{kafka_metadata_quorum_cmd} --command-config
{command_config_path} add-controller"
+ self.logger.info(f"Running add controller command...\n{cmd}")
controller.account.ssh(cmd)
def remove_controller(self, controllerId, directoryId, node=None):
@@ -1829,12 +1831,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
if node is None:
node = self.nodes[0]
cmd = fix_opts_for_new_jvm(node)
- cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i
%(controller_id)s -d %(directory_id)s" % {
- 'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node,
use_controller_bootstrap=True),
- 'controller_id': controllerId,
- 'directory_id': directoryId
- }
- self.logger.info("Running remove controller command...\n%s" % cmd)
+ kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(node,
use_controller_bootstrap=True)
+ cmd += f"{kafka_metadata_quorum_cmd} remove-controller -i
{controllerId} -d {directoryId}"
+ self.logger.info(f"Running remove controller command...\n{cmd}")
node.account.ssh(cmd)
def zk_connect_setting(self):
@@ -1844,8 +1843,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
- raise ValueError("We are retrieving bootstrap servers for the
port: %s which is not currently open. - " %
- str(port.port_number))
+ raise ValueError(f"We are retrieving bootstrap servers for the
port: {str(port.port_number)} "
+ f"which is not currently open.")
return ','.join([node.account.hostname + ":" + str(port.port_number)
for node in self.nodes
@@ -1853,8 +1852,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
if validate and not port.open:
- raise ValueError("We are retrieving bootstrap servers for the
port: %s which is not currently open. - " %
- str(port.port_number))
+ raise ValueError(f"We are retrieving bootstrap controllers for the
port: {str(port.port_number)} "
+ f"which is not currently open.")
return ','.join([node.account.hostname + ":" + str(port.port_number)
for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
index 432b25c1686..80cf535021c 100644
--- a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -17,9 +17,13 @@ import json
import re
from functools import partial
+from typing import List
+from ducktape.cluster.cluster import ClusterNode
from ducktape.mark import matrix
from ducktape.mark.resource import cluster
+from ducktape.tests.test import TestContext
+from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService
@@ -33,7 +37,7 @@ from kafkatest.version import DEV_BRANCH
# Test quorum reconfiguration for combined and isolated mode
#
class TestQuorumReconfiguration(ProduceConsumeValidateTest):
- def __init__(self, test_context):
+ def __init__(self, test_context: TestContext):
super(TestQuorumReconfiguration,
self).__init__(test_context=test_context)
def setUp(self):
@@ -46,46 +50,58 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
self.num_producers = 1
self.num_consumers = 1
- def perform_reconfig(self, active_controller_id, inactive_controller_id,
inactive_controller, broker_ids):
+ def perform_reconfig(self,
+ active_controller_id: int,
+ inactive_controller_id: int,
+ inactive_controller: ClusterNode,
+ broker_only_ids: List[int]):
+ """
+ Tests quorum reconfiguration by adding a second controller and then
removing the active controller.
+
+ :param active_controller_id: id of the active controller
+ :param inactive_controller_id: id of the inactive controller
+ :param inactive_controller: node object of the inactive controller
+ :param broker_only_ids: broker ids of nodes which have no controller
process
+ """
# Check describe quorum output shows the controller (first node) is
the leader and the only voter
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Start second controller
self.kafka.controller_quorum.add_broker(inactive_controller)
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids +
[inactive_controller_id])
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id],
+ broker_only_ids +
[inactive_controller_id]), timeout_sec=5)
# Add controller to quorum
self.kafka.controller_quorum.add_controller(inactive_controller_id,
inactive_controller)
# Check describe quorum output shows both controllers are voters
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
active_controller_id, inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ active_controller_id,
+ [active_controller_id,
inactive_controller_id],
+ broker_only_ids),
timeout_sec=5)
# Remove leader from quorum
- voters = json_from_line(r"CurrentVoters:.*", output)
+ voters = json_from_line(r"CurrentVoters:.*",
self.kafka.describe_quorum())
directory_id = next(voter["directoryId"] for voter in voters if
voter["id"] == active_controller_id)
self.kafka.controller_quorum.remove_controller(active_controller_id,
directory_id)
-
- # Check describe quorum output to show second_controller is now the
leader
- output = self.kafka.describe_quorum()
- assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
- assert_nodes_in_output(r"CurrentVoters:.*", output,
inactive_controller_id)
- assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+ # Describe quorum output shows the second controller is now leader,
old controller is an observer
+ wait_until(lambda:
check_describe_quorum_output(self.kafka.describe_quorum(),
+ inactive_controller_id,
+
[inactive_controller_id],
+ broker_only_ids +
[active_controller_id]), timeout_sec=5)
@cluster(num_nodes=6)
@matrix(metadata_quorum=[combined_kraft])
def test_combined_mode_reconfig(self, metadata_quorum):
+ """
+ Tests quorum reconfiguration in combined mode with produce & consume
validation.
+ Starts a controller in standalone mode with two other broker nodes,
then calls perform_reconfig to add
+ a second controller and then remove the first controller.
+ """
self.kafka = KafkaService(self.test_context,
- num_nodes=4,
+ num_nodes=4, # 2 combined, 2 broker-only
nodes
zk=None,
topics={self.topic: {"partitions":
self.partitions,
"replication-factor":
self.replication_factor,
@@ -93,7 +109,8 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
version=DEV_BRANCH,
controller_num_nodes_override=2,
dynamicRaftQuorum=True)
- # Start one out of two controllers (standalone mode)
+ # Start a controller and the broker-only nodes
+ # We leave starting the second controller for later in perform_reconfig
inactive_controller = self.kafka.nodes[1]
self.kafka.start(nodes_to_skip=[inactive_controller])
@@ -115,10 +132,15 @@ class
TestQuorumReconfiguration(ProduceConsumeValidateTest):
@cluster(num_nodes=7)
@matrix(metadata_quorum=[isolated_kraft])
def test_isolated_mode_reconfig(self, metadata_quorum):
+ """
+ Tests quorum reconfiguration in isolated mode with produce & consume
validation.
+ Starts a controller in standalone mode with three other broker nodes,
then calls perform_reconfig to add
+ a second controller and then remove the first controller.
+ """
# Start up KRaft controller in migration mode
remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
self.kafka = KafkaService(self.test_context,
- num_nodes=3,
+ num_nodes=3, # 3 broker-only nodes
zk=None,
topics={self.topic: {"partitions":
self.partitions,
"replication-factor":
self.replication_factor,
@@ -127,7 +149,8 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
controller_num_nodes_override=2,
quorum_info_provider=remote_quorum,
dynamicRaftQuorum=True)
- # Start one out of two controllers (standalone mode)
+ # Start a controller and the broker-only nodes
+ # We leave starting the second controller for later in perform_reconfig
controller_quorum = self.kafka.controller_quorum
inactive_controller = controller_quorum.nodes[1]
self.kafka.start(isolated_controllers_to_skip=[inactive_controller])
@@ -147,17 +170,34 @@ class
TestQuorumReconfiguration(ProduceConsumeValidateTest):
inactive_controller,
[self.kafka.idx(node) for node in self.kafka.nodes]))
-def assert_nodes_in_output(pattern, output, *node_ids):
+def check_nodes_in_output(pattern: str, output: str, *node_ids: int):
nodes = json_from_line(pattern, output)
- assert len(nodes) == len(node_ids)
+ if len(nodes) != len(node_ids):
+ return False
for node in nodes:
- assert node["id"] in node_ids
-
-def json_from_line(pattern, output):
+ if not node["id"] in node_ids:
+ return False
+ return True
+
+def check_describe_quorum_output(output: str, leader_id: int, voter_ids:
List[int], observer_ids: List[int]):
+ """
+ Check that the describe quorum output contains the expected leader,
voters, and observers
+ :param output: Describe quorum output
+ :param leader_id: Expected leader id
+ :param voter_ids: Expected voter ids
+ :param observer_ids: Expected observer ids
+ :return:
+ """
+ if not re.search(r"LeaderId:\s*" + str(leader_id), output):
+ return False
+ return (check_nodes_in_output(r"CurrentVoters:.*", output, *voter_ids) and
+ check_nodes_in_output(r"CurrentObservers:.*", output,
*observer_ids))
+
+def json_from_line(pattern: str, output: str):
match = re.search(pattern, output)
if not match:
- raise Exception("Expected match for pattern %s in describe quorum
output" % pattern)
+ raise Exception(f"Expected match for pattern {pattern} in describe
quorum output")
line = match.group(0)
start_index = line.find('[')
end_index = line.rfind(']') + 1