This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 89cb632acd3d895bf057640eb05319ae1c3f4a4d
Author: Alyssa Huang <[email protected]>
AuthorDate: Thu Sep 26 10:56:19 2024 -0700

    KAFKA-17608, KAFKA-17604, KAFKA-16963; KRaft controller crashes when active 
controller is removed (#17146)
    
    This change fixes a few issues.
    
    KAFKA-17608; KRaft controller crashes when active controller is removed
    When a control batch is committed, the quorum controller currently 
increases the last stable offset but fails to create a snapshot for that 
offset. This causes an issue if the quorum controller renounces and needs to 
revert to that offset (which has no snapshot present). Since the control 
batches are no-ops for the quorum controller, it does not need to update its 
offsets for control records. We skip handle commit logic for control batches.
    
    KAFKA-17604; Describe quorum output missing added voters endpoints
    Describe quorum output will miss endpoints of voters which were added via 
AddRaftVoter. This is due to a bug in LeaderState's 
updateVoterAndObserverStates which will pull replica state from observer states 
map (which does not include endpoints). The fix is to populate endpoints from 
the lastVoterSet passed into the method.
    
    Reviewers: José Armando García Sancio <[email protected]>, Colin P. McCabe 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/controller/OffsetControlManager.java     |   4 +
 .../apache/kafka/controller/QuorumController.java  |   8 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |   2 +-
 .../java/org/apache/kafka/raft/LeaderState.java    |   9 +-
 tests/kafkatest/services/kafka/kafka.py            |  63 ++++++------
 .../tests/core/quorum_reconfiguration_test.py      | 110 ++++++++++++++-------
 6 files changed, 123 insertions(+), 73 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index 9a04301456c..e422ad602c3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -280,6 +280,10 @@ class OffsetControlManager {
         this.lastCommittedOffset = batch.lastOffset();
         this.lastCommittedEpoch = batch.epoch();
         maybeAdvanceLastStableOffset();
+        handleCommitBatchMetrics(batch);
+    }
+
+    void handleCommitBatchMetrics(Batch<ApiMessageAndVersion> batch) {
         metrics.setLastCommittedRecordOffset(batch.lastOffset());
         if (!active()) {
             // On standby controllers, the last applied record offset is 
equals to the last
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 74451716277..1fe995dddd6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1065,7 +1065,10 @@ public final class QuorumController implements 
Controller {
                         int epoch = batch.epoch();
                         List<ApiMessageAndVersion> messages = batch.records();
 
-                        if (isActive) {
+                        if (messages.isEmpty()) {
+                            log.debug("Skipping handling commit for batch with 
no data records with offset {} and epoch {}.", offset, epoch);
+                            offsetControl.handleCommitBatchMetrics(batch);
+                        } else if (isActive) {
                             // If the controller is active, the records were 
already replayed,
                             // so we don't need to do it here.
                             log.debug("Completing purgatory items up to offset 
{} and epoch {}.", offset, epoch);
@@ -1075,9 +1078,6 @@ public final class QuorumController implements Controller 
{
                             offsetControl.handleCommitBatch(batch);
                             
deferredEventQueue.completeUpTo(offsetControl.lastStableOffset());
                             
deferredUnstableEventQueue.completeUpTo(offsetControl.lastCommittedOffset());
-
-                            // The active controller can delete up to the 
current committed offset.
-                            
snapshotRegistry.deleteSnapshotsUpTo(offsetControl.lastStableOffset());
                         } else {
                             // If the controller is a standby, replay the 
records that were
                             // created by the active controller.
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index d75417a77a1..4a708cb8716 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2911,7 +2911,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             if (quorum.isVoter()) {
                 transitionToCandidate(currentTimeMs);
             } else {
-                // It is posible that the old leader is not a voter in the new 
voter set.
+                // It is possible that the old leader is not a voter in the 
new voter set.
                 // In that case increase the epoch and transition to 
unattached. The epoch needs
                 // to be increased to avoid FETCH responses with the leader 
being this replica.
                 transitionToUnattached(quorum.epoch() + 1);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index ae9be94e13d..c09282c87c9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -678,6 +678,9 @@ public class LeaderState<T> implements EpochState {
 
             // Make sure that the replica key in the replica state matches the 
voter's
             state.setReplicaKey(voterNode.voterKey());
+
+            // Make sure that the listeners are updated
+            state.updateListeners(voterNode.listeners());
             newVoterStates.put(state.replicaKey.id(), state);
         }
         voterStates = newVoterStates;
@@ -752,8 +755,12 @@ public class LeaderState<T> implements EpochState {
             this.replicaKey = replicaKey;
         }
 
+        void updateListeners(Endpoints listeners) {
+            this.listeners = listeners;
+        }
+
         void clearListeners() {
-            this.listeners = Endpoints.empty();
+            updateListeners(Endpoints.empty());
         }
 
         boolean matchesKey(ReplicaKey replicaKey) {
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 547762a5343..f154e2049c0 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -267,6 +267,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or 
controller to also use ZooKeeper
         :param quorum_info_provider: A function that takes this KafkaService 
as an argument and returns a ServiceQuorumInfo. If this is None, then the 
ServiceQuorumInfo is generated from the test context
         :param use_new_coordinator: When true, use the new implementation of 
the group coordinator as per KIP-848. If this is None, the default existing 
group coordinator is used.
+        :param dynamicRaftQuorum: When true, the quorum uses kraft.version=1, 
controller_quorum_bootstrap_servers, and bootstraps the first controller using 
the standalone flag
         """
 
         self.zk = zk
@@ -300,7 +301,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
         if self.quorum_info.using_kraft:
             self.dynamicRaftQuorum = dynamicRaftQuorum
-            self.first_controller_started = False
+            # Used to ensure not more than one controller bootstraps with the 
standalone flag
+            self.standalone_controller_bootstrapped = False
             if self.quorum_info.has_brokers:
                 num_nodes_broker_role = num_nodes
                 if self.quorum_info.has_controllers:
@@ -458,8 +460,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.combined_nodes_started = 0
         self.nodes_to_start = self.nodes
 
-    # Does not do any validation to check if this node is part of an isolated 
controller quorum or not
     def node_id_as_isolated_controller(self, node):
+        """
+        Generates the node id for a controller-only node, starting from 
config_property.FIRST_CONTROLLER_ID so as not  
+        to overlap with broker id numbering.
+        This method does not do any validation to check this node is actually 
part of an isolated controller quorum.
+        """
         return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
 
     def reconfigure_zk_for_migration(self, kraft_quorum):
@@ -755,8 +761,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         for port in self.port_mappings.values():
             if port.open:
                 listeners.append(port.listener())
-                if not port.name in controller_listener_names:
-                    advertised_listeners.append(port.advertised_listener(node))
+                advertised_listeners.append(port.advertised_listener(node))
                 protocol_map.append(port.listener_security_protocol())
         controller_sec_protocol = 
self.isolated_controller_quorum.controller_security_protocol if 
self.isolated_controller_quorum \
             else self.controller_security_protocol if 
self.quorum_info.has_brokers_and_controllers and not 
quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
@@ -873,12 +878,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             self.maybe_setup_broker_scram_credentials(node)
 
         if self.quorum_info.using_kraft:
-            # define controller.quorum.bootstrap.servrers or 
controller.quorum.voters text
+            # define controller.quorum.bootstrap.servers or 
controller.quorum.voters text
             security_protocol_to_use = 
self.controller_quorum.controller_security_protocol
             first_node_id = 1 if self.quorum_info.has_brokers_and_controllers 
else config_property.FIRST_CONTROLLER_ID
-            controller_quorum_bootstrap_servers = ','.join(["%s:%s" % 
(node.account.hostname,
-                                                                       
config_property.FIRST_CONTROLLER_PORT +
-                                                                       
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+            controller_quorum_bootstrap_servers = 
','.join(["{}:{}".format(node.account.hostname,
+                                                                           
config_property.FIRST_CONTROLLER_PORT +
+                                                                           
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
                                                             for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             if self.dynamicRaftQuorum:
                 self.controller_quorum_bootstrap_servers = 
controller_quorum_bootstrap_servers
@@ -904,11 +909,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % 
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
             if self.dynamicRaftQuorum:
                 cmd += " --feature kraft.version=1"
-                if not self.first_controller_started and 
self.node_quorum_info.has_controller_role:
+                if not self.standalone_controller_bootstrapped and 
self.node_quorum_info.has_controller_role:
                     cmd += " --standalone"
+                    self.standalone_controller_bootstrapped = True
             self.logger.info("Running log directory format command...\n%s" % 
cmd)
             node.account.ssh(cmd)
-            self.first_controller_started = True
 
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService %s on %s with 
command: %s" %\
@@ -1038,11 +1043,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         else:
             security_protocol_to_use = kafka_security_protocol
         if use_controller_bootstrap:
-            bootstrap = "--bootstrap-controller %s" % 
(self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
+            bootstrap = "--bootstrap-controller {}".format(
+                
self.bootstrap_controllers("CONTROLLER_{}".format(security_protocol_to_use)))
         else:
-            bootstrap = "--bootstrap-server %s" % 
(self.bootstrap_servers(security_protocol_to_use))
+            bootstrap = "--bootstrap-server 
{}".format(self.bootstrap_servers(security_protocol_to_use))
         kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", 
node)
-        return "%s %s" % (kafka_metadata_script, bootstrap)
+        return "{} {}".format(kafka_metadata_script, bootstrap)
 
     def kafka_topics_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
         if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
@@ -1794,10 +1800,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         if node is None:
             node = self.nodes[0]
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
-            'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
-        }
-        self.logger.info("Running describe quorum command...\n%s" % cmd)
+        cmd += f"{self.kafka_metadata_quorum_cmd(node)} describe --status"
+        self.logger.info(f"Running describe quorum command...\n{cmd}")
         node.account.ssh(cmd)
 
         output = ""
@@ -1821,11 +1825,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
         controller.account.create_file(command_config_path, configs)
         cmd = fix_opts_for_new_jvm(controller)
-        cmd += "%(kafka_metadata_quorum_cmd)s --command-config 
%(command_config)s add-controller" % {
-            'kafka_metadata_quorum_cmd': 
self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
-            'command_config': command_config_path
-        }
-        self.logger.info("Running add controller command...\n%s" % cmd)
+        kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(controller, 
use_controller_bootstrap=True)
+        cmd += f"{kafka_metadata_quorum_cmd} --command-config 
{command_config_path} add-controller"
+        self.logger.info(f"Running add controller command...\n{cmd}")
         controller.account.ssh(cmd)
 
     def remove_controller(self, controllerId, directoryId, node=None):
@@ -1835,12 +1837,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         if node is None:
             node = self.nodes[0]
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i 
%(controller_id)s -d %(directory_id)s" % {
-            'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node, 
use_controller_bootstrap=True),
-            'controller_id': controllerId,
-            'directory_id': directoryId
-        }
-        self.logger.info("Running remove controller command...\n%s" % cmd)
+        kafka_metadata_quorum_cmd = self.kafka_metadata_quorum_cmd(node, 
use_controller_bootstrap=True)
+        cmd += f"{kafka_metadata_quorum_cmd} remove-controller -i 
{controllerId} -d {directoryId}"
+        self.logger.info(f"Running remove controller command...\n{cmd}")
         node.account.ssh(cmd)
 
     def zk_connect_setting(self):
@@ -1850,8 +1849,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
     def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
         if validate and not port.open:
-            raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " %
-                             str(port.port_number))
+            raise ValueError(f"We are retrieving bootstrap servers for the 
port: {str(port.port_number)} "
+                             f"which is not currently open.")
 
         return ','.join([node.account.hostname + ":" + str(port.port_number)
                          for node in self.nodes
@@ -1859,8 +1858,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
     def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
         if validate and not port.open:
-            raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " %
-                             str(port.port_number))
+            raise ValueError(f"We are retrieving bootstrap controllers for the 
port: {str(port.port_number)} "
+                             f"which is not currently open.")
 
         return ','.join([node.account.hostname + ":" + str(port.port_number)
                          for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py 
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
index 432b25c1686..80cf535021c 100644
--- a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -17,9 +17,13 @@ import json
 import re
 
 from functools import partial
+from typing import List
 
+from ducktape.cluster.cluster import ClusterNode
 from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
+from ducktape.tests.test import TestContext
+from ducktape.utils.util import wait_until
 
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.kafka import KafkaService
@@ -33,7 +37,7 @@ from kafkatest.version import DEV_BRANCH
 # Test quorum reconfiguration for combined and isolated mode
 #
 class TestQuorumReconfiguration(ProduceConsumeValidateTest):
-    def __init__(self, test_context):
+    def __init__(self, test_context: TestContext):
         super(TestQuorumReconfiguration, 
self).__init__(test_context=test_context)
 
     def setUp(self):
@@ -46,46 +50,58 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
         self.num_producers = 1
         self.num_consumers = 1
 
-    def perform_reconfig(self, active_controller_id, inactive_controller_id, 
inactive_controller, broker_ids):
+    def perform_reconfig(self,
+                         active_controller_id: int,
+                         inactive_controller_id: int,
+                         inactive_controller: ClusterNode,
+                         broker_only_ids: List[int]):
+        """
+        Tests quorum reconfiguration by adding a second controller and then 
removing the active controller.
+
+        :param active_controller_id: id of the active controller
+        :param inactive_controller_id: id of the inactive controller
+        :param inactive_controller: node object of the inactive controller
+        :param broker_only_ids: broker ids of nodes which have no controller 
process
+        """
         # Check describe quorum output shows the controller (first node) is 
the leader and the only voter
-        output = self.kafka.describe_quorum()
-        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
-        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id)
-        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+        wait_until(lambda: 
check_describe_quorum_output(self.kafka.describe_quorum(),
+                                                        active_controller_id,
+                                                        [active_controller_id],
+                                                        broker_only_ids), 
timeout_sec=5)
         # Start second controller
         self.kafka.controller_quorum.add_broker(inactive_controller)
-        output = self.kafka.describe_quorum()
-        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
-        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id)
-        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids + 
[inactive_controller_id])
-
+        wait_until(lambda: 
check_describe_quorum_output(self.kafka.describe_quorum(),
+                                                        active_controller_id,
+                                                        [active_controller_id],
+                                                        broker_only_ids + 
[inactive_controller_id]), timeout_sec=5)
         # Add controller to quorum
         self.kafka.controller_quorum.add_controller(inactive_controller_id, 
inactive_controller)
 
         # Check describe quorum output shows both controllers are voters
-        output = self.kafka.describe_quorum()
-        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
-        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id, inactive_controller_id)
-        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+        wait_until(lambda: 
check_describe_quorum_output(self.kafka.describe_quorum(),
+                                                        active_controller_id,
+                                                        [active_controller_id, 
inactive_controller_id],
+                                                        broker_only_ids), 
timeout_sec=5)
         # Remove leader from quorum
-        voters = json_from_line(r"CurrentVoters:.*", output)
+        voters = json_from_line(r"CurrentVoters:.*", 
self.kafka.describe_quorum())
         directory_id = next(voter["directoryId"] for voter in voters if 
voter["id"] == active_controller_id)
         self.kafka.controller_quorum.remove_controller(active_controller_id, 
directory_id)
-
-        # Check describe quorum output to show second_controller is now the 
leader
-        output = self.kafka.describe_quorum()
-        assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
-        assert_nodes_in_output(r"CurrentVoters:.*", output, 
inactive_controller_id)
-        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
-
+        # Describe quorum output shows the second controller is now leader, 
old controller is an observer
+        wait_until(lambda: 
check_describe_quorum_output(self.kafka.describe_quorum(),
+                                                        inactive_controller_id,
+                                                        
[inactive_controller_id],
+                                                        broker_only_ids + 
[active_controller_id]), timeout_sec=5)
 
     @cluster(num_nodes=6)
     @matrix(metadata_quorum=[combined_kraft])
     def test_combined_mode_reconfig(self, metadata_quorum):
+        """
+        Tests quorum reconfiguration in combined mode with produce & consume 
validation.
+        Starts a controller in standalone mode with two other broker nodes, 
then calls perform_reconfig to add
+        a second controller and then remove the first controller.
+        """
         self.kafka = KafkaService(self.test_context,
-                                  num_nodes=4,
+                                  num_nodes=4,  # 2 combined, 2 broker-only 
nodes
                                   zk=None,
                                   topics={self.topic: {"partitions": 
self.partitions,
                                                        "replication-factor": 
self.replication_factor,
@@ -93,7 +109,8 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
                                   version=DEV_BRANCH,
                                   controller_num_nodes_override=2,
                                   dynamicRaftQuorum=True)
-        # Start one out of two controllers (standalone mode)
+        # Start a controller and the broker-only nodes
+        # We leave starting the second controller for later in perform_reconfig
         inactive_controller = self.kafka.nodes[1]
         self.kafka.start(nodes_to_skip=[inactive_controller])
 
@@ -115,10 +132,15 @@ class 
TestQuorumReconfiguration(ProduceConsumeValidateTest):
     @cluster(num_nodes=7)
     @matrix(metadata_quorum=[isolated_kraft])
     def test_isolated_mode_reconfig(self, metadata_quorum):
+        """
+        Tests quorum reconfiguration in isolated mode with produce & consume 
validation.
+        Starts a controller in standalone mode with three other broker nodes, 
then calls perform_reconfig to add
+        a second controller and then remove the first controller.
+        """
         # Start up KRaft controller in migration mode
         remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
         self.kafka = KafkaService(self.test_context,
-                                  num_nodes=3,
+                                  num_nodes=3,  # 3 broker-only nodes
                                   zk=None,
                                   topics={self.topic: {"partitions": 
self.partitions,
                                                        "replication-factor": 
self.replication_factor,
@@ -127,7 +149,8 @@ class TestQuorumReconfiguration(ProduceConsumeValidateTest):
                                   controller_num_nodes_override=2,
                                   quorum_info_provider=remote_quorum,
                                   dynamicRaftQuorum=True)
-        # Start one out of two controllers (standalone mode)
+        # Start a controller and the broker-only nodes
+        # We leave starting the second controller for later in perform_reconfig
         controller_quorum = self.kafka.controller_quorum
         inactive_controller = controller_quorum.nodes[1]
         self.kafka.start(isolated_controllers_to_skip=[inactive_controller])
@@ -147,17 +170,34 @@ class 
TestQuorumReconfiguration(ProduceConsumeValidateTest):
                                                            inactive_controller,
                                                            
[self.kafka.idx(node) for node in self.kafka.nodes]))
 
-def assert_nodes_in_output(pattern, output, *node_ids):
+def check_nodes_in_output(pattern: str, output: str, *node_ids: int):
     nodes = json_from_line(pattern, output)
-    assert len(nodes) == len(node_ids)
+    if len(nodes) != len(node_ids):
+        return False
 
     for node in nodes:
-        assert node["id"] in node_ids
-
-def json_from_line(pattern, output):
+        if not node["id"] in node_ids:
+            return False
+    return True
+
+def check_describe_quorum_output(output: str, leader_id: int, voter_ids: 
List[int], observer_ids: List[int]):
+    """
+    Check that the describe quorum output contains the expected leader, 
voters, and observers
+    :param output: Describe quorum output
+    :param leader_id: Expected leader id
+    :param voter_ids: Expected voter ids
+    :param observer_ids: Expected observer ids
+    :return:
+    """
+    if not re.search(r"LeaderId:\s*" + str(leader_id), output):
+        return False
+    return (check_nodes_in_output(r"CurrentVoters:.*", output, *voter_ids) and
+            check_nodes_in_output(r"CurrentObservers:.*", output, 
*observer_ids))
+
+def json_from_line(pattern: str, output: str):
     match = re.search(pattern, output)
     if not match:
-        raise Exception("Expected match for pattern %s in describe quorum 
output" % pattern)
+        raise Exception(f"Expected match for pattern {pattern} in describe 
quorum output")
     line = match.group(0)
     start_index = line.find('[')
     end_index = line.rfind(']') + 1

Reply via email to