This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 099e91f5fc7 KAFKA-19719 --no-initial-controllers should not assume 
kraft.version=… (#20616)
099e91f5fc7 is described below

commit 099e91f5fc7e0a44ffec05d60cba650ceea4109a
Author: Kevin Wu <[email protected]>
AuthorDate: Mon Oct 6 09:42:35 2025 -0500

    KAFKA-19719 --no-initial-controllers should not assume kraft.version=… 
(#20616)
    
    backport KAFKA-19719 to 4.0
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  |  38 ++++--
 .../ReconfigurableQuorumIntegrationTest.java       |  50 ++++---
 .../kafka/server/KRaftClusterTest.scala            |   3 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  14 +-
 docs/ops.html                                      |  62 ++++-----
 .../apache/kafka/metadata/storage/Formatter.java   |  30 +++--
 .../kafka/metadata/storage/FormatterTest.java      | 144 ++++++++++++---------
 .../apache/kafka/server/common/KRaftVersion.java   |   8 +-
 .../kafka/common/test/KafkaClusterTestKit.java     | 108 ++++++++++++----
 .../org/apache/kafka/common/test/TestKitNodes.java |   5 -
 10 files changed, 275 insertions(+), 187 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 40892bca38c..594109e15d2 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -135,21 +135,30 @@ object StorageTool extends Logging {
       featureNamesAndLevels(_).foreachEntry {
         (k, v) => formatter.setFeatureLevel(k, v)
       })
-    Option(namespace.getString("initial_controllers")).
+    val initialControllers = namespace.getString("initial_controllers")
+    val isStandalone = namespace.getBoolean("standalone")
+    val staticVotersEmpty = config.quorumConfig.voters().isEmpty
+    formatter.setHasDynamicQuorum(staticVotersEmpty)
+    if (!staticVotersEmpty && (Option(initialControllers).isDefined || 
isStandalone)) {
+      throw new TerseFailure("You cannot specify " +
+        QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
+        "with --initial-controllers or --standalone. " +
+        "If you want to use dynamic quorum, please remove " +
+        QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
+        QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
+    }
+    Option(initialControllers).
       foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
-    if (namespace.getBoolean("standalone")) {
+    if (isStandalone) {
       formatter.setInitialControllers(createStandaloneDynamicVoters(config))
     }
-    if (namespace.getBoolean("no_initial_controllers")) {
-      formatter.setNoInitialControllersFlag(true)
-    } else {
-      if (config.processRoles.contains(ProcessRole.ControllerRole)) {
-        if (config.quorumConfig.voters().isEmpty && 
formatter.initialVoters().isEmpty) {
+    if (!namespace.getBoolean("no_initial_controllers") &&
+      config.processRoles.contains(ProcessRole.ControllerRole) &&
+      staticVotersEmpty &&
+      formatter.initialVoters().isEmpty) {
           throw new TerseFailure("Because " + 
QuorumConfig.QUORUM_VOTERS_CONFIG +
             " is not set on this controller, you must specify one of the 
following: " +
             "--standalone, --initial-controllers, or 
--no-initial-controllers.");
-        }
-      }
     }
     Option(namespace.getList("add_scram")).
       foreach(scramArgs => 
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -319,18 +328,21 @@ object StorageTool extends Logging {
 
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
     reconfigurableQuorumOptions.addArgument("--standalone", "-s")
-      .help("Used to initialize a controller as a single-node dynamic quorum.")
+      .help("Used to initialize a controller as a single-node dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
-      .help("Used to initialize a server without a dynamic quorum topology.")
+      .help("Used to initialize a server without specifying a dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config should not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
-      .help("Used to initialize a server with a specific dynamic quorum 
topology. The argument " +
+      .help("Used to initialize a server with the specified dynamic quorum. 
The argument " +
         "is a comma-separated list of id@hostname:port:directory. The same 
values must be used to " +
         "format all nodes. For 
example:\[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
-        
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+        
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When 
setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(store())
   }
 
diff --git 
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java 
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index 981217ce287..fe1b6b5da9f 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.TreeMap;
@@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
             new TestKitNodes.Builder().
                 setNumBrokerNodes(1).
                 setNumControllerNodes(1).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
                 build()
-        ).build()) {
+        ).setStandalone(true).build()) {
             cluster.format();
             cluster.startup();
             try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
 
     @Test
     public void testRemoveController() throws Exception {
-        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder().
-                setNumBrokerNodes(1).
-                setNumControllerNodes(3).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
-                build()
-        ).build()) {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setInitialVoterSet(initialVoters).
+            build()
+        ) {
             cluster.format();
             cluster.startup();
             try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
 
     @Test
     public void testRemoveAndAddSameController() throws Exception {
-        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder().
-                setNumBrokerNodes(1).
-                setNumControllerNodes(4).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
-                build()).build()
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(4).
+            build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setInitialVoterSet(initialVoters).
+            build()
         ) {
             cluster.format();
             cluster.startup();
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 4fe4fb48cd8..2b99be9321f 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1013,8 +1013,7 @@ class KRaftClusterTest {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
         setNumBrokerNodes(1).
-        setNumControllerNodes(1).
-        setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+        setNumControllerNodes(1).build()).setStandalone(true).build()
     try {
       cluster.format()
       cluster.startup()
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 9fde243ec19..84404507093 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -375,7 +375,10 @@ Found problem:
   def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultStaticQuorumProperties)
+    properties.setProperty("process.roles", "broker")
+    properties.setProperty("node.id", "0")
+    properties.setProperty("controller.listener.names", "CONTROLLER")
+    properties.setProperty("controller.quorum.bootstrap.servers", 
"localhost:9093")
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--standalone")
@@ -458,19 +461,14 @@ Found problem:
               Seq("--release-version", "3.9-IV0"))).getMessage)
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = Array(false, true))
-  def 
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: 
Boolean): Unit = {
+  @Test
+  def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
     properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--no-initial-controllers")
-    if (setKraftVersionFeature) {
-      arguments += "--feature"
-      arguments += "kraft.version=1"
-    }
     assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
     assertTrue(stream.toString().
       contains("Formatting metadata directory %s".format(availableDirs.head)),
diff --git a/docs/ops.html b/docs/ops.html
index 5a60a4cde89..3d1c99e8522 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -3869,45 +3869,29 @@ In the replica description 
0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the
   If you are not sure whether you are using static or dynamic quorums, you can 
determine this by
   running something like the following:<p>
 
-<pre><code class="language-bash">
-  $ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
-</code></pre><p>
-
-  If the <code>kraft.version</code> field is level 0 or absent, you are using 
a static quorum. If
-  it is 1 or above, you are using a dynamic quorum. For example, here is an 
example of a static
-  quorum:<p/>
-<pre><code class="language-bash">
-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  
FinalizedVersionLevel: 0 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-</code></pre><p/>
-
-  Here is another example of a static quorum:<p/>
-<pre><code class="language-bash">
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    
SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0  Epoch: 5
-</code></pre><p/>
-
-  Here is an example of a dynamic quorum:<p/>
-<pre><code class="language-bash">
-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  
FinalizedVersionLevel: 1 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-</code></pre><p/>
-
-  The static versus dynamic nature of the quorum is determined at the time of 
formatting.
-  Specifically, the quorum will be formatted as dynamic if 
<code>controller.quorum.voters</code> is
-  <b>not</b> present, and if the software version is Apache Kafka 3.9 or 
newer. If you have
-  followed the instructions earlier in this document, you will get a dynamic 
quorum.<p>
-
-  If you would like the formatting process to fail if a dynamic quorum cannot 
be achieved, format your
-  controllers using the <code>--feature kraft.version=1</code>. (Note that you 
should not supply
-  this flag when formatting brokers -- only when formatting controllers.)<p>
-
-<pre><code class="language-bash">
-  $ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 
-c controller_static.properties
-  Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try 
removing the --feature flag for kraft.version.
-</code></pre><p>
-
-  Note: Currently it is <b>not</b> possible to convert clusters using a static 
controller quorum to
-  use a dynamic controller quorum. This function will be supported in the 
future release.
+  <pre><code class="language-bash">$ bin/kafka-features.sh 
--bootstrap-controller localhost:9093 describe</code></pre>
+  <p>
+    If the <code>kraft.version</code> field is level 0 or absent, you are 
using a static quorum. If
+    it is 1 or above, you are using a dynamic quorum. For example, here is an 
example of a static
+    quorum:<p>
+  <pre><code class="language-bash">Feature: kraft.version  
SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 0 Epoch: 
5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 
5</code></pre>
+  <p>
+    Here is another example of a static quorum:<p>
+  <pre><code class="language-bash">Feature: metadata.version       
SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.8-IV0 
FinalizedVersionLevel: 3.8-IV0  Epoch: 5</code></pre>
+  <p>
+    Here is an example of a dynamic quorum:<p>
+  <pre><code class="language-bash">Feature: kraft.version  
SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 1 Epoch: 
5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 
5</code></pre>
+  <p>
+    The static versus dynamic nature of the quorum is determined at the time 
of formatting.
+    Specifically, the quorum will be formatted as dynamic if 
<code>controller.quorum.voters</code> is
+    <b>not</b> present, and one of --standalone, --initial-controllers, or 
--no-initial-controllers is set.
+    If you have followed the instructions earlier in this document, you will 
get a dynamic quorum.
+  <p>
+
+    Note: Currently it is <b>not</b> possible to convert clusters using a 
static controller quorum to
+    use a dynamic controller quorum. This function will be supported in the 
future release.
 
   <h5 class="anchor-heading"><a id="kraft_reconfig_add" 
class="anchor-link"></a><a href="#kraft_reconfig_add">Add New 
Controller</a></h5>
   If a dynamic controller cluster already exists, it can be expanded by first 
provisioning a new controller using the <a 
href="#kraft_storage_observers">kafka-storage.sh tool</a> and starting the 
controller.
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index acba0f7a04b..e79ca41ff80 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -132,7 +132,7 @@ public class Formatter {
      * The initial KIP-853 voters.
      */
     private Optional<DynamicVoters> initialControllers = Optional.empty();
-    private boolean noInitialControllersFlag = false;
+    private boolean hasDynamicQuorum = false;
 
     public Formatter setPrintStream(PrintStream printStream) {
         this.printStream = printStream;
@@ -218,8 +218,8 @@ public class Formatter {
         return this;
     }
 
-    public Formatter setNoInitialControllersFlag(boolean 
noInitialControllersFlag) {
-        this.noInitialControllersFlag = noInitialControllersFlag;
+    public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
+        this.hasDynamicQuorum = hasDynamicQuorum;
         return this;
     }
 
@@ -228,7 +228,7 @@ public class Formatter {
     }
 
     boolean hasDynamicQuorum() {
-        return initialControllers.isPresent() || noInitialControllersFlag;
+        return hasDynamicQuorum;
     }
 
     public BootstrapMetadata bootstrapMetadata() {
@@ -338,8 +338,8 @@ public class Formatter {
     /**
      * Calculate the effective feature level for kraft.version. In order to 
keep existing
      * command-line invocations of StorageTool working, we default this to 0 
if no dynamic
-     * voter quorum arguments were provided. As a convenience, if dynamic 
voter quorum arguments
-     * were passed, we set the latest kraft.version. (Currently there is only 
1 non-zero version).
+     * voter quorum arguments were provided. As a convenience, if the static 
voters config is
+     * empty, we set the latest kraft.version. (Currently there is only 1 
non-zero version).
      *
      * @param configuredKRaftVersionLevel   The configured level for 
kraft.version
      * @return                              The effective feature level.
@@ -348,15 +348,21 @@ public class Formatter {
         if (configuredKRaftVersionLevel.isPresent()) {
             if (configuredKRaftVersionLevel.get() == 0) {
                 if (hasDynamicQuorum()) {
-                    throw new FormatterException("Cannot set kraft.version to 
" +
-                        configuredKRaftVersionLevel.get() + " if KIP-853 
configuration is present. " +
-                            "Try removing the --feature flag for 
kraft.version.");
+                    throw new FormatterException(
+                        "Cannot set kraft.version to 0 if 
controller.quorum.voters is empty and one of the flags " +
+                        "--standalone, --initial-controllers, or 
--no-initial-controllers is used. For dynamic " +
+                        "controllers support, try removing the --feature flag 
for kraft.version."
+                    );
                 }
             } else {
                 if (!hasDynamicQuorum()) {
-                    throw new FormatterException("Cannot set kraft.version to 
" +
-                        configuredKRaftVersionLevel.get() + " unless KIP-853 
configuration is present. " +
-                            "Try removing the --feature flag for 
kraft.version.");
+                    throw new FormatterException(
+                        "Cannot set kraft.version to " + 
configuredKRaftVersionLevel.get() +
+                        " unless controller.quorum.voters is empty and one of 
the flags --standalone, " +
+                        "--initial-controllers, or --no-initial-controllers is 
used. " +
+                        "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, " +
+                        "or --no-initial-controllers and removing 
controller.quorum.voters."
+                    );
                 }
             }
             return configuredKRaftVersionLevel.get();
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 0706e4e738f..83946c426fc 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TestFeatureVersion;
 import org.apache.kafka.server.common.TransactionVersion;
@@ -194,6 +195,40 @@ public class FormatterTest {
         }
     }
 
+    @Test
+    public void testStandaloneWithIgnoreFormatted() throws Exception {
+        try (TestEnv testEnv = new TestEnv(1)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            String originalDirectoryId = Uuid.randomUuid().toString();
+            String newDirectoryId = Uuid.randomUuid().toString();
+            formatter1.formatter
+                .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" 
+ originalDirectoryId))
+                .setHasDynamicQuorum(true)
+                .run();
+            assertEquals("Formatting dynamic metadata voter directory " + 
testEnv.directory(0) +
+                    " with metadata.version " + 
MetadataVersion.latestProduction() + ".",
+                formatter1.output().trim());
+            assertMetadataDirectoryId(testEnv, 
Uuid.fromString(originalDirectoryId));
+
+            FormatterContext formatter2 = testEnv.newFormatter();
+            formatter2.formatter
+                .setIgnoreFormatted(true)
+                .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" 
+ newDirectoryId))
+                .run();
+            assertEquals("All of the log directories are already formatted.",
+                formatter2.output().trim());
+            assertMetadataDirectoryId(testEnv, 
Uuid.fromString(originalDirectoryId));
+        }
+    }
+
+    private void assertMetadataDirectoryId(TestEnv testEnv, Uuid 
expectedDirectoryId) throws Exception {
+        MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
+            addLogDirs(testEnv.directories).
+            load();
+        MetaProperties logDirProps0 = 
ensemble.logDirProps().get(testEnv.directory(0));
+        assertEquals(expectedDirectoryId, logDirProps0.directoryId().get());
+    }
+
     @Test
     public void testOneDirectoryFormattedAndOthersNotFormatted() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
@@ -383,14 +418,15 @@ public class FormatterTest {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
             if (specifyKRaftVersion) {
-                formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
+                
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
             }
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             formatter1.formatter.run();
-            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
-            assertEquals(Arrays.asList(
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            assertEquals(List.of(
                 String.format("Formatting data directory %s with %s %s.",
                     testEnv.directory(1),
                     MetadataVersion.FEATURE_NAME,
@@ -416,45 +452,66 @@ public class FormatterTest {
     public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 0);
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             assertTrue(formatter1.formatter.hasDynamicQuorum());
-            assertEquals("Cannot set kraft.version to 0 if KIP-853 
configuration is present. " +
-                "Try removing the --feature flag for kraft.version.",
-                    assertThrows(FormatterException.class,
-                        () -> formatter1.formatter.run()).getMessage());
+            assertEquals(
+                "Cannot set kraft.version to 0 if controller.quorum.voters is 
empty " +
+                "and one of the flags --standalone, --initial-controllers, or 
--no-initial-controllers is used. " +
+                "For dynamic controllers support, try removing the --feature 
flag for kraft.version.",
+                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
+            );
         }
     }
 
     @Test
-    public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() 
throws Exception {
+    public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 1);
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             assertFalse(formatter1.formatter.hasDynamicQuorum());
-            assertEquals("Cannot set kraft.version to 1 unless KIP-853 
configuration is present. " +
-                "Try removing the --feature flag for kraft.version.",
-                    assertThrows(FormatterException.class,
-                        () -> formatter1.formatter.run()).getMessage());
+            assertEquals(
+                "Cannot set kraft.version to 1 unless controller.quorum.voters 
is empty and " +
+                "one of the flags --standalone, --initial-controllers, or 
--no-initial-controllers is used. " +
+                "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, " +
+                "or --no-initial-controllers and removing 
controller.quorum.voters.",
+                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
+            );
         }
     }
 
     @Test
-    public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() 
throws Exception {
+    public void testFormatWithInitialVotersWithOlderMetadataVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
             
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 1);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            assertEquals("kraft.version could not be set to 1 because it 
depends on " +
-                "metadata.version level 21",
-                    assertThrows(IllegalArgumentException.class,
-                        () -> formatter1.formatter.run()).getMessage());
+            formatter1.formatter.setHasDynamicQuorum(true);
+            formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean 
hasDynamicQuorum) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+            formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
+            formatter1.formatter.run();
+            if (hasDynamicQuorum) {
+                assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            } else {
+                assertEquals((short) 0, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            }
         }
     }
 
@@ -475,6 +532,7 @@ public class FormatterTest {
             
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 1);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
                 assertDoesNotThrow(() -> formatter1.formatter.run());
             } else {
@@ -486,21 +544,15 @@ public class FormatterTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testFormatWithNoInitialControllers(boolean 
specifyKRaftVersion) throws Exception {
+    @Test
+    public void testFormatWithNoInitialControllers() throws Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            if (specifyKRaftVersion) {
-                formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
-            }
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(true);
-            assertTrue(formatter1.formatter.hasDynamicQuorum());
-
+            assertFalse(formatter1.formatter.hasDynamicQuorum());
             formatter1.formatter.run();
-            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
-            assertEquals(Arrays.asList(
+            assertEquals((short) 0, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            assertEquals(List.of(
                     String.format("Formatting data directory %s with %s %s.",
                         testEnv.directory(1),
                         MetadataVersion.FEATURE_NAME,
@@ -519,34 +571,4 @@ public class FormatterTest {
             assertNotNull(logDirProps1);
         }
     }
-
-    @Test
-    public void 
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws 
Exception {
-        try (TestEnv testEnv = new TestEnv(2)) {
-            FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
-            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(false);
-            assertFalse(formatter1.formatter.hasDynamicQuorum());
-            assertEquals("Cannot set kraft.version to 1 unless KIP-853 
configuration is present. " +
-                    "Try removing the --feature flag for kraft.version.",
-                assertThrows(FormatterException.class,
-                    formatter1.formatter::run).getMessage());
-        }
-    }
-
-    @Test
-    public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() 
throws Exception {
-        try (TestEnv testEnv = new TestEnv(2)) {
-            FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
-            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(true);
-            assertTrue(formatter1.formatter.hasDynamicQuorum());
-            assertEquals("Cannot set kraft.version to 0 if KIP-853 
configuration is present. " +
-                    "Try removing the --feature flag for kraft.version.",
-                assertThrows(FormatterException.class,
-                    formatter1.formatter::run).getMessage());
-        }
-    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index 211f6dcac44..988d58c3e0e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
 import java.util.Map;
 
 public enum KRaftVersion implements FeatureVersion {
@@ -73,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
 
     @Override
     public Map<String, Short> dependencies() {
-        if (this.featureLevel == 0) {
-            return Collections.emptyMap();
-        } else {
-            return Collections.singletonMap(
-                MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_9_IV0.featureLevel());
-        }
+        return Map.of();
     }
 
     public short quorumStateVersion() {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 20963665440..1a16a7a0a34 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,10 +27,12 @@ import kafka.server.SharedServer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -42,7 +44,6 @@ import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.config.KRaftConfigs;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.fault.FaultHandler;
@@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         private final String controllerListenerName;
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
+        private boolean standalone;
+        private Optional<Map<Integer, Uuid>> initialVoterSet = 
Optional.empty();
         private boolean deleteOnClose;
 
         public Builder(TestKitNodes nodes) {
@@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
+        public Builder setStandalone(boolean standalone) {
+            this.standalone = standalone;
+            return this;
+        }
+
+        public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
+            this.initialVoterSet = Optional.of(initialVoterSet);
+            return this;
+        }
+
         private KafkaConfig createNodeConfig(TestKitNode node) throws 
IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
@@ -168,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, 
brokerListenerName);
             props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
controllerListenerName);
 
-            StringBuilder quorumVoterStringBuilder = new StringBuilder();
-            String prefix = "";
-            for (int nodeId : nodes.controllerNodes().keySet()) {
-                quorumVoterStringBuilder.append(prefix).
-                    append(nodeId).
-                    append("@").
-                    append("localhost").
-                    append(":").
-                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
-                prefix = ",";
+            if (!standalone && initialVoterSet.isEmpty()) {
+                StringBuilder quorumVoterStringBuilder = new StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    quorumVoterStringBuilder.append(prefix).
+                        append(nodeId).
+                        append("@").
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
+            } else {
+                StringBuilder bootstrapServersStringBuilder = new 
StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    bootstrapServersStringBuilder.append(prefix).
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersStringBuilder.toString());
             }
-            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
 
             // reduce log cleaner offset map memory usage
             
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -251,7 +277,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         Time.SYSTEM,
                         new Metrics(),
                         
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
-                        Collections.emptyList(),
+                        
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
                         faultHandlerFactory,
                         
socketFactoryManager.getOrCreateSocketFactory(node.id())
                     );
@@ -279,7 +305,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                             Time.SYSTEM,
                             new Metrics(),
                             
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
-                            Collections.emptyList(),
+                            
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
                             faultHandlerFactory,
                             
socketFactoryManager.getOrCreateSocketFactory(node.id())
                         );
@@ -316,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     faultHandlerFactory,
                     socketFactoryManager,
                     Optional.ofNullable(jaasFile),
+                    standalone,
+                    initialVoterSet,
                     deleteOnClose);
         }
 
@@ -361,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final PreboundSocketFactoryManager socketFactoryManager;
     private final String controllerListenerName;
     private final Optional<File> jaasFile;
+    private final boolean standalone;
+    private final Optional<Map<Integer, Uuid>> initialVoterSet;
     private final boolean deleteOnClose;
 
     private KafkaClusterTestKit(
@@ -371,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         SimpleFaultHandlerFactory faultHandlerFactory,
         PreboundSocketFactoryManager socketFactoryManager,
         Optional<File> jaasFile,
+        boolean standalone,
+        Optional<Map<Integer, Uuid>> initialVoterSet,
         boolean deleteOnClose
     ) {
         /*
@@ -388,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.socketFactoryManager = socketFactoryManager;
         this.controllerListenerName = nodes.controllerListenerName().value();
         this.jaasFile = jaasFile;
+        this.standalone = standalone;
+        this.initialVoterSet = initialVoterSet;
         this.deleteOnClose = deleteOnClose;
     }
 
@@ -422,8 +456,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
         boolean writeMetadataDirectory
     ) {
         try {
+            final var nodeId = ensemble.nodeId().getAsInt();
             Formatter formatter = new Formatter();
-            formatter.setNodeId(ensemble.nodeId().getAsInt());
+            formatter.setNodeId(nodeId);
             formatter.setClusterId(ensemble.clusterId().get());
             if (writeMetadataDirectory) {
                 formatter.setDirectories(ensemble.logDirProps().keySet());
@@ -436,8 +471,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 return;
             }
             
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
-            formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
-                
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
             formatter.setUnstableFeatureVersionsEnabled(true);
             formatter.setIgnoreFormatted(false);
             formatter.setControllerListenerName(controllerListenerName);
@@ -446,18 +479,43 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             } else {
                 formatter.setMetadataLogDirectory(Optional.empty());
             }
-            if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
-                StringBuilder dynamicVotersBuilder = new StringBuilder();
-                String prefix = "";
-                for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
-                    int port = socketFactoryManager.
-                        getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
+            StringBuilder dynamicVotersBuilder = new StringBuilder();
+            String prefix = "";
+            if (standalone) {
+                if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + 
TestKitDefaults.CONTROLLER_ID_OFFSET) {
+                    final var controllerNode = 
nodes.controllerNodes().get(nodeId);
+                    dynamicVotersBuilder.append(
+                        String.format(
+                            "%d@localhost:%d:%s",
+                            controllerNode.id(),
+                            socketFactoryManager.
+                                
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+                            controllerNode.metadataDirectoryId()
+                        )
+                    );
+                    
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                }
+                // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, 
the node is formatting with
+                // the --no-initial-controllers flag
+                formatter.setHasDynamicQuorum(true);
+            } else if (initialVoterSet.isPresent()) {
+                for (final var controllerNode : 
initialVoterSet.get().entrySet()) {
+                    final var voterId = controllerNode.getKey();
+                    final var voterDirectoryId = controllerNode.getValue();
                     dynamicVotersBuilder.append(prefix);
                     prefix = ",";
-                    
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
-                        controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
+                    dynamicVotersBuilder.append(
+                        String.format(
+                            "%d@localhost:%d:%s",
+                            voterId,
+                            socketFactoryManager.
+                                getOrCreatePortForListener(voterId, 
controllerListenerName),
+                            voterDirectoryId
+                        )
+                    );
                 }
                 
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                formatter.setHasDynamicQuorum(true);
             }
             formatter.run();
         } catch (Exception e) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 759e86c200b..cd8879a84db 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -94,11 +94,6 @@ public class TestKitNodes {
             return this;
         }
 
-        public Builder setFeature(String featureName, short level) {
-            this.bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
-            return this;
-        }
-
         public Builder setCombined(boolean combined) {
             this.combined = combined;
             return this;


Reply via email to