This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c2c2dd424b21af53110f1a137127dc80481d4281
Author: Alyssa Huang <[email protected]>
AuthorDate: Fri Sep 6 13:44:09 2024 -0700

    KAFKA-16963: Ducktape test for KIP-853 (#17081)
    
    Add a ducktape system test for KIP-853 quorum reconfiguration, including 
adding and removing voters.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../java/org/apache/kafka/raft/LeaderState.java    |   2 +-
 tests/kafkatest/services/kafka/config_property.py  |   4 +
 tests/kafkatest/services/kafka/kafka.py            | 146 +++++++++++++++---
 .../services/kafka/templates/kafka.properties      |   6 +-
 .../tests/core/quorum_reconfiguration_test.py      | 165 +++++++++++++++++++++
 5 files changed, 301 insertions(+), 22 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 803804858ab..ae9be94e13d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -384,7 +384,7 @@ public class LeaderState<T> implements EpochState {
     }
 
     public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long 
currentTimeMs) {
-        // In summary, let's consider a replica caughed up for add voter, if 
they
+        // In summary, let's consider a replica caught up for add voter, if 
they
         // have fetched within the last hour
         long anHourInMs = TimeUnit.HOURS.toMillis(1);
         return Optional.ofNullable(observerStates.get(replicaKey))
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index bc4708d0d32..28582513aba 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -19,12 +19,16 @@ Define Kafka configuration property names here.
 
 BROKER_ID = "broker.id"
 NODE_ID = "node.id"
+PROCESS_ROLES = "process.roles"
 FIRST_BROKER_PORT = 9092
 FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
 FIRST_CONTROLLER_ID = 3001
 CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
 PORT = "port"
 ADVERTISED_HOSTNAME = "advertised.host.name"
+ADVERTISED_LISTENERS = "advertised.listeners"
+LISTENERS = "listeners"
+CONTROLLER_LISTENER_NAMES = "controller.listener.names"
 
 NUM_NETWORK_THREADS = "num.network.threads"
 NUM_IO_THREADS = "num.io.threads"
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index bd1c9b1e332..547762a5343 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -204,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                  controller_num_nodes_override=0,
                  allow_zk_with_kraft=False,
                  quorum_info_provider=None,
-                 use_new_coordinator=None
+                 use_new_coordinator=None,
+                 dynamicRaftQuorum=False
                  ):
         """
         :param context: test context
@@ -262,7 +263,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             e.g: {1: [["config1", "true"], ["config2", "1000"]], 2: 
[["config1", "false"], ["config2", "0"]]}
         :param str extra_kafka_opts: jvm args to add to KAFKA_OPTS variable
         :param KafkaService isolated_kafka: process.roles=controller for this 
cluster when not None; ignored when using ZooKeeper
-        :param int controller_num_nodes_override: the number of nodes to use 
in the cluster, instead of 5, 3, or 1 based on num_nodes, if positive, not 
using ZooKeeper, and isolated_kafka is not None; ignored otherwise
+        :param int controller_num_nodes_override: the number of controller 
nodes to use in the cluster, instead of 5, 3, or 1 based on num_nodes, if 
positive, not using ZooKeeper, and isolated_kafka is not None; ignored otherwise
         :param bool allow_zk_with_kraft: if True, then allow a KRaft broker or 
controller to also use ZooKeeper
         :param quorum_info_provider: A function that takes this KafkaService 
as an argument and returns a ServiceQuorumInfo. If this is None, then the 
ServiceQuorumInfo is generated from the test context
         :param use_new_coordinator: When true, use the new implementation of 
the group coordinator as per KIP-848. If this is None, the default existing 
group coordinator is used.
@@ -298,6 +299,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.num_nodes_controller_role = 0
 
         if self.quorum_info.using_kraft:
+            self.dynamicRaftQuorum = dynamicRaftQuorum
+            self.first_controller_started = False
             if self.quorum_info.has_brokers:
                 num_nodes_broker_role = num_nodes
                 if self.quorum_info.has_controllers:
@@ -337,7 +340,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                     listener_security_config=listener_security_config,
                     extra_kafka_opts=extra_kafka_opts, tls_version=tls_version,
                     isolated_kafka=self, 
allow_zk_with_kraft=self.allow_zk_with_kraft,
-                    server_prop_overrides=server_prop_overrides
+                    server_prop_overrides=server_prop_overrides, 
dynamicRaftQuorum=self.dynamicRaftQuorum
                 )
                 self.controller_quorum = self.isolated_controller_quorum
 
@@ -434,15 +437,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
             kraft_broker_plus_zk_configs.update(zk_broker_configs)
             kraft_broker_plus_zk_configs.pop(config_property.BROKER_ID)
-            controller_only_configs = {
-                config_property.NODE_ID: self.idx(node) + 
config_property.FIRST_CONTROLLER_ID - 1,
-            }
-            kraft_controller_plus_zk_configs = controller_only_configs.copy()
-            kraft_controller_plus_zk_configs.update(zk_broker_configs)
-            kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
             if node_quorum_info.service_quorum_info.using_zk:
                 node.config = KafkaConfig(**zk_broker_configs)
             elif not node_quorum_info.has_broker_role: # KRaft controller-only 
role
+                controller_only_configs = {
+                    config_property.NODE_ID: 
self.node_id_as_isolated_controller(node),
+                }
+                kraft_controller_plus_zk_configs = 
controller_only_configs.copy()
+                kraft_controller_plus_zk_configs.update(zk_broker_configs)
+                kraft_controller_plus_zk_configs.pop(config_property.BROKER_ID)
                 if self.zk:
                     node.config = 
KafkaConfig(**kraft_controller_plus_zk_configs)
                 else:
@@ -455,6 +458,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.combined_nodes_started = 0
         self.nodes_to_start = self.nodes
 
+    # Does not do any validation to check if this node is part of an isolated 
controller quorum or not
+    def node_id_as_isolated_controller(self, node):
+        return self.idx(node) + config_property.FIRST_CONTROLLER_ID - 1
+
     def reconfigure_zk_for_migration(self, kraft_quorum):
         self.configured_for_zk_migration = True
         self.controller_quorum = kraft_quorum
@@ -627,7 +634,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def alive(self, node):
         return len(self.pids(node)) > 0
 
-    def start(self, add_principals="", nodes_to_skip=[], timeout_sec=60, 
**kwargs):
+    def start(self, add_principals="", nodes_to_skip=[], 
isolated_controllers_to_skip=[], timeout_sec=60, **kwargs):
         """
         Start the Kafka broker and wait until it registers its ID in ZooKeeper
         Startup will be skipped for any nodes in nodes_to_skip. These nodes 
can be started later via add_broker
@@ -665,7 +672,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             self._ensure_zk_chroot()
 
         if self.isolated_controller_quorum:
-            self.isolated_controller_quorum.start()
+            
self.isolated_controller_quorum.start(nodes_to_skip=isolated_controllers_to_skip)
 
         Service.start(self, **kwargs)
 
@@ -727,7 +734,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.concurrent_start = False
         self.start_node(node)
         self.concurrent_start = orig_concurrent_start
-        wait_until(lambda: self.is_registered(node), 30, 1)
 
     def _ensure_zk_chroot(self):
         self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
@@ -867,15 +873,19 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             self.maybe_setup_broker_scram_credentials(node)
 
         if self.quorum_info.using_kraft:
-            # define controller.quorum.voters text
+            # define controller.quorum.bootstrap.servrers or 
controller.quorum.voters text
             security_protocol_to_use = 
self.controller_quorum.controller_security_protocol
             first_node_id = 1 if self.quorum_info.has_brokers_and_controllers 
else config_property.FIRST_CONTROLLER_ID
-            self.controller_quorum_voters = ','.join(["%s@%s:%s" %
-                                                      
(self.controller_quorum.idx(node) + first_node_id - 1,
-                                                       node.account.hostname,
-                                                       
config_property.FIRST_CONTROLLER_PORT +
-                                                       
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
-                                                      for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+            controller_quorum_bootstrap_servers = ','.join(["%s:%s" % 
(node.account.hostname,
+                                                                       
config_property.FIRST_CONTROLLER_PORT +
+                                                                       
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+                                                            for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
+            if self.dynamicRaftQuorum:
+                self.controller_quorum_bootstrap_servers = 
controller_quorum_bootstrap_servers
+            else:
+                self.controller_quorum_voters = ','.join(["%s@%s" % 
(self.controller_quorum.idx(node) + first_node_id - 1,
+                                                                     
bootstrap_server)
+                                                          for bootstrap_server 
in controller_quorum_bootstrap_servers])
             # define controller.listener.names
             self.controller_listener_names = 
','.join(self.controller_listener_name_list(node))
             # define sasl.mechanism.controller.protocol to match the isolated 
quorum if one exists
@@ -892,8 +902,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             # format log directories if necessary
             kafka_storage_script = self.path.script("kafka-storage.sh", node)
             cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % 
(kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
+            if self.dynamicRaftQuorum:
+                cmd += " --feature kraft.version=1"
+                if not self.first_controller_started and 
self.node_quorum_info.has_controller_role:
+                    cmd += " --standalone"
             self.logger.info("Running log directory format command...\n%s" % 
cmd)
             node.account.ssh(cmd)
+            self.first_controller_started = True
 
         cmd = self.start_cmd(node)
         self.logger.debug("Attempting to start KafkaService %s on %s with 
command: %s" %\
@@ -1009,9 +1024,26 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         JmxMixin.clean_node(self, node)
         self.security_config.clean_node(node)
         node.account.kill_process(self.java_class_name(),
-                                         clean_shutdown=False, allow_fail=True)
+                                  clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, 
allow_fail=False)
 
+    def kafka_metadata_quorum_cmd(self, node, kafka_security_protocol=None, 
use_controller_bootstrap=False):
+        if kafka_security_protocol is None:
+            # it wasn't specified, so use the inter-broker/controller security 
protocol if it is PLAINTEXT,
+            # otherwise use the client security protocol
+            if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                security_protocol_to_use = SecurityConfig.PLAINTEXT
+            else:
+                security_protocol_to_use = self.security_protocol
+        else:
+            security_protocol_to_use = kafka_security_protocol
+        if use_controller_bootstrap:
+            bootstrap = "--bootstrap-controller %s" % 
(self.bootstrap_controllers("CONTROLLER_%s" % security_protocol_to_use))
+        else:
+            bootstrap = "--bootstrap-server %s" % 
(self.bootstrap_servers(security_protocol_to_use))
+        kafka_metadata_script = self.path.script("kafka-metadata-quorum.sh", 
node)
+        return "%s %s" % (kafka_metadata_script, bootstrap)
+
     def kafka_topics_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol=None, offline_nodes=[]):
         if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
             raise Exception("Must invoke kafka-topics against a broker, not a 
KRaft controller")
@@ -1755,6 +1787,62 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.debug(output)
         return output
 
+    def describe_quorum(self, node=None):
+        """Run the describe quorum command.
+        Specifying node is optional, if not specified the command will be run 
from self.nodes[0]
+        """
+        if node is None:
+            node = self.nodes[0]
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%(kafka_metadata_quorum_cmd)s describe --status" % {
+            'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node)
+        }
+        self.logger.info("Running describe quorum command...\n%s" % cmd)
+        node.account.ssh(cmd)
+
+        output = ""
+        for line in node.account.ssh_capture(cmd):
+            output += line
+
+        return output
+
+    def add_controller(self, controllerId, controller):
+        """Run the metadata quorum add controller command. This should be run 
on the node that is being added.
+        """
+        command_config_path = os.path.join(KafkaService.PERSISTENT_ROOT, 
"controller_command_config.properties")
+
+        configs = f"""
+{config_property.NODE_ID}={controllerId}
+{config_property.PROCESS_ROLES}=controller
+{config_property.METADATA_LOG_DIR}={KafkaService.METADATA_LOG_DIR}
+{config_property.ADVERTISED_LISTENERS}={self.advertised_listeners}
+{config_property.LISTENERS}={self.listeners}
+{config_property.CONTROLLER_LISTENER_NAMES}={self.controller_listener_names}"""
+
+        controller.account.create_file(command_config_path, configs)
+        cmd = fix_opts_for_new_jvm(controller)
+        cmd += "%(kafka_metadata_quorum_cmd)s --command-config 
%(command_config)s add-controller" % {
+            'kafka_metadata_quorum_cmd': 
self.kafka_metadata_quorum_cmd(controller, use_controller_bootstrap=True),
+            'command_config': command_config_path
+        }
+        self.logger.info("Running add controller command...\n%s" % cmd)
+        controller.account.ssh(cmd)
+
+    def remove_controller(self, controllerId, directoryId, node=None):
+        """Run the admin tool remove controller command.
+        Specifying node is optional, if not specified the command will be run 
from self.nodes[0]
+        """
+        if node is None:
+            node = self.nodes[0]
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%(kafka_metadata_quorum_cmd)s remove-controller -i 
%(controller_id)s -d %(directory_id)s" % {
+            'kafka_metadata_quorum_cmd': self.kafka_metadata_quorum_cmd(node, 
use_controller_bootstrap=True),
+            'controller_id': controllerId,
+            'directory_id': directoryId
+        }
+        self.logger.info("Running remove controller command...\n%s" % cmd)
+        node.account.ssh(cmd)
+
     def zk_connect_setting(self):
         if self.quorum_info.using_kraft and not self.zk:
             raise Exception("No zookeeper connect string available with KRaft 
unless ZooKeeper is explicitly enabled")
@@ -1769,6 +1857,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                          for node in self.nodes
                          if node not in offline_nodes])
 
+    def __bootstrap_controllers(self, port, validate=True, offline_nodes=[]):
+        if validate and not port.open:
+            raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " %
+                             str(port.port_number))
+
+        return ','.join([node.account.hostname + ":" + str(port.port_number)
+                         for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]
+                         if node not in offline_nodes])
+
     def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, 
offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as 
HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
@@ -1778,6 +1875,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.info("Bootstrap client port is: " + 
str(port_mapping.port_number))
         return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
 
+    def bootstrap_controllers(self, protocol='CONTROLLER_PLAINTEXT', 
validate=True, offline_nodes=[]):
+        """Return comma-delimited list of controllers in this cluster 
formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
+
+        This is the format expected by many config files.
+        """
+        port_mapping = self.port_mappings[protocol]
+        self.logger.info("Bootstrap client port is: " + 
str(port_mapping.port_number))
+        return self.__bootstrap_controllers(port_mapping, validate, 
offline_nodes)
+
     def controller(self):
         """ Get the controller node
         """
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 65bf389b029..21b60afeb83 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -23,8 +23,12 @@ process.roles=controller
 {% else %}
 process.roles=broker
 {% endif %}
-# The connect string for the controller quorum
+# The connect string for the controller quorum. Only one should be defined
+{% if controller_quorum_bootstrap_servers %}
+controller.quorum.bootstrap.servers={{ controller_quorum_bootstrap_servers }}
+{% else %}
 controller.quorum.voters={{ controller_quorum_voters }}
+{% endif %}
 
 controller.listener.names={{ controller_listener_names }}
 
diff --git a/tests/kafkatest/tests/core/quorum_reconfiguration_test.py 
b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
new file mode 100644
index 00000000000..432b25c1686
--- /dev/null
+++ b/tests/kafkatest/tests/core/quorum_reconfiguration_test.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import re
+
+from functools import partial
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka.quorum import combined_kraft, ServiceQuorumInfo, 
isolated_kraft
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+from kafkatest.version import DEV_BRANCH
+
+#
+# Test quorum reconfiguration for combined and isolated mode
+#
+class TestQuorumReconfiguration(ProduceConsumeValidateTest):
+    def __init__(self, test_context):
+        super(TestQuorumReconfiguration, 
self).__init__(test_context=test_context)
+
+    def setUp(self):
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+    def perform_reconfig(self, active_controller_id, inactive_controller_id, 
inactive_controller, broker_ids):
+        # Check describe quorum output shows the controller (first node) is 
the leader and the only voter
+        output = self.kafka.describe_quorum()
+        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id)
+        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+        # Start second controller
+        self.kafka.controller_quorum.add_broker(inactive_controller)
+        output = self.kafka.describe_quorum()
+        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id)
+        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids + 
[inactive_controller_id])
+
+        # Add controller to quorum
+        self.kafka.controller_quorum.add_controller(inactive_controller_id, 
inactive_controller)
+
+        # Check describe quorum output shows both controllers are voters
+        output = self.kafka.describe_quorum()
+        assert re.search(r"LeaderId:\s*" + str(active_controller_id), output)
+        assert_nodes_in_output(r"CurrentVoters:.*", output, 
active_controller_id, inactive_controller_id)
+        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+        # Remove leader from quorum
+        voters = json_from_line(r"CurrentVoters:.*", output)
+        directory_id = next(voter["directoryId"] for voter in voters if 
voter["id"] == active_controller_id)
+        self.kafka.controller_quorum.remove_controller(active_controller_id, 
directory_id)
+
+        # Check describe quorum output to show second_controller is now the 
leader
+        output = self.kafka.describe_quorum()
+        assert re.search(r"LeaderId:\s*" + str(inactive_controller_id), output)
+        assert_nodes_in_output(r"CurrentVoters:.*", output, 
inactive_controller_id)
+        assert_nodes_in_output(r"CurrentObservers:.*", output, *broker_ids)
+
+
+    @cluster(num_nodes=6)
+    @matrix(metadata_quorum=[combined_kraft])
+    def test_combined_mode_reconfig(self, metadata_quorum):
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=4,
+                                  zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 1}}},
+                                  version=DEV_BRANCH,
+                                  controller_num_nodes_override=2,
+                                  dynamicRaftQuorum=True)
+        # Start one out of two controllers (standalone mode)
+        inactive_controller = self.kafka.nodes[1]
+        self.kafka.start(nodes_to_skip=[inactive_controller])
+
+        # Start producer and consumer
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           self.topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int, 
compression_types=["none"],
+                                           version=DEV_BRANCH, 
offline_nodes=[inactive_controller])
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int, 
version=DEV_BRANCH)
+        # Perform reconfigurations
+        self.run_produce_consume_validate(
+            core_test_action=lambda: 
self.perform_reconfig(self.kafka.idx(self.kafka.nodes[0]),
+                                                           
self.kafka.idx(inactive_controller),
+                                                           inactive_controller,
+                                                           
[self.kafka.idx(node) for node in self.kafka.nodes[2:]]))
+
+    @cluster(num_nodes=7)
+    @matrix(metadata_quorum=[isolated_kraft])
+    def test_isolated_mode_reconfig(self, metadata_quorum):
+        # Start up KRaft controller in migration mode
+        remote_quorum = partial(ServiceQuorumInfo, isolated_kraft)
+        self.kafka = KafkaService(self.test_context,
+                                  num_nodes=3,
+                                  zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 1}}},
+                                  version=DEV_BRANCH,
+                                  controller_num_nodes_override=2,
+                                  quorum_info_provider=remote_quorum,
+                                  dynamicRaftQuorum=True)
+        # Start one out of two controllers (standalone mode)
+        controller_quorum = self.kafka.controller_quorum
+        inactive_controller = controller_quorum.nodes[1]
+        self.kafka.start(isolated_controllers_to_skip=[inactive_controller])
+
+        # Start producer and consumer
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           self.topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int, 
compression_types=["none"],
+                                           version=DEV_BRANCH)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        self.topic, new_consumer=True, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int, 
version=DEV_BRANCH)
+        # Perform reconfigurations
+        self.run_produce_consume_validate(
+            core_test_action=lambda: 
self.perform_reconfig(controller_quorum.node_id_as_isolated_controller(self.kafka.controller_quorum.nodes[0]),
+                                                           
controller_quorum.node_id_as_isolated_controller(inactive_controller),
+                                                           inactive_controller,
+                                                           
[self.kafka.idx(node) for node in self.kafka.nodes]))
+
+def assert_nodes_in_output(pattern, output, *node_ids):
+    nodes = json_from_line(pattern, output)
+    assert len(nodes) == len(node_ids)
+
+    for node in nodes:
+        assert node["id"] in node_ids
+
+def json_from_line(pattern, output):
+    match = re.search(pattern, output)
+    if not match:
+        raise Exception("Expected match for pattern %s in describe quorum 
output" % pattern)
+    line = match.group(0)
+    start_index = line.find('[')
+    end_index = line.rfind(']') + 1
+
+    return json.loads(line[start_index:end_index])

Reply via email to