This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 012e4ca6d8f KAFKA-19719 --no-initial-controllers should not assume 
kraft.version=1 (#20604)
012e4ca6d8f is described below

commit 012e4ca6d8fcd9a76dcb60480d9ba9cb7827816e
Author: Kevin Wu <[email protected]>
AuthorDate: Tue Sep 30 09:30:23 2025 -0500

    KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 
(#20604)
    
    ```
    commit ec37eb538b7d7e113b80e09276606395b007127e (HEAD ->
    KAFKA-19719-cherry-pick-41, origin/KAFKA-19719-cherry-pick-41)
    Author: Kevin Wu <[email protected]>
    Date:   Thu Sep 25 11:56:16 2025 -0500
    
        KAFKA-19719: --no-initial-controllers should not assume
    kraft.version=1 (#20551)
    
        Just because a controller node sets --no-initial-controllers flag
    does     not mean it is necessarily running kraft.version=1. The more
    precise     meaning is that the controller node being formatted does not
    know what     kraft version the cluster should be in, and therefore it
    is only safe to     assume kraft.version=0. Only by setting
    --standalone,--initial-controllers, or --no-initial-controllers     AND
    not specifying the controller.quorum.voters static config, is it
    known kraft.version > 0.
    
        For example, it is a valid configuration (although confusing) to run
    a     static   quorum defined by controller.quorum.voters but have all
    the     controllers   format with --no-initial-controllers. In this
    case,     specifying --no-initial-controllers alongside a metadata
    version that     does not  support kraft.version=1 causes formatting to
    fail, which is     does not  support kraft.version=1 causes formatting
    to fail, which is     a  regression.
    
        Additionally, the formatter should not check the kraft.version
    against     the release version, since kraft.version does not actually
    depend on any     release version. It should only check the
    kraft.version against the     static voters config/format arguments.
    
        This PR also cleans up the integration test framework to match the
    semantics of formatting an actual cluster.
    
        Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng
    <[email protected]>, Chia-Ping Tsai <[email protected]>, José Armando
    García Sancio <[email protected]>      Conflicts:
    core/src/main/scala/kafka/tools/StorageTool.scala Minor conflicts. Keep
    changes from cherry-pick.
    core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
    Remove auto-join tests, since 4.1 does not support it. docs/ops.html
    Keep docs section from cherry-pick.
    metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
    Minor conflicts. Keep cherry-picked changes.
    
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
    Conflicts due to integration test framework changes. Keep new changes.
    
    commit 02d58b176c32917962ab25b6d685059179d06f26 (upstream/4.1)
    ```
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  |  38 +++++---
 .../ReconfigurableQuorumIntegrationTest.java       |  50 +++++++---
 .../kafka/server/KRaftClusterTest.scala            |   3 +-
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  14 ++-
 docs/ops.html                                      |  10 +-
 .../apache/kafka/metadata/storage/Formatter.java   |  29 +++---
 .../kafka/metadata/storage/FormatterTest.java      | 107 +++++++++-----------
 .../apache/kafka/server/common/KRaftVersion.java   |   7 +-
 .../kafka/common/test/KafkaClusterTestKit.java     | 108 ++++++++++++++++-----
 .../org/apache/kafka/common/test/TestKitNodes.java |   5 -
 10 files changed, 211 insertions(+), 160 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index c7b4f28e336..5b60b6f389a 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -135,21 +135,30 @@ object StorageTool extends Logging {
       featureNamesAndLevels(_).foreachEntry {
         (k, v) => formatter.setFeatureLevel(k, v)
       })
-    Option(namespace.getString("initial_controllers")).
+    val initialControllers = namespace.getString("initial_controllers")
+    val isStandalone = namespace.getBoolean("standalone")
+    val staticVotersEmpty = config.quorumConfig.voters().isEmpty
+    formatter.setHasDynamicQuorum(staticVotersEmpty)
+    if (!staticVotersEmpty && (Option(initialControllers).isDefined || 
isStandalone)) {
+      throw new TerseFailure("You cannot specify " +
+        QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
+        "with --initial-controllers or --standalone. " +
+        "If you want to use dynamic quorum, please remove " +
+        QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
+        QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
+    }
+    Option(initialControllers).
       foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
-    if (namespace.getBoolean("standalone")) {
+    if (isStandalone) {
       formatter.setInitialControllers(createStandaloneDynamicVoters(config))
     }
-    if (namespace.getBoolean("no_initial_controllers")) {
-      formatter.setNoInitialControllersFlag(true)
-    } else {
-      if (config.processRoles.contains(ProcessRole.ControllerRole)) {
-        if (config.quorumConfig.voters().isEmpty && 
formatter.initialVoters().isEmpty) {
+    if (!namespace.getBoolean("no_initial_controllers") &&
+      config.processRoles.contains(ProcessRole.ControllerRole) &&
+      staticVotersEmpty &&
+      formatter.initialVoters().isEmpty) {
           throw new TerseFailure("Because " + 
QuorumConfig.QUORUM_VOTERS_CONFIG +
             " is not set on this controller, you must specify one of the 
following: " +
             "--standalone, --initial-controllers, or 
--no-initial-controllers.");
-        }
-      }
     }
     Option(namespace.getList("add_scram")).
       foreach(scramArgs => 
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -319,18 +328,21 @@ object StorageTool extends Logging {
 
     val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
     reconfigurableQuorumOptions.addArgument("--standalone", "-s")
-      .help("Used to initialize a controller as a single-node dynamic quorum.")
+      .help("Used to initialize a controller as a single-node dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
-      .help("Used to initialize a server without a dynamic quorum topology.")
+      .help("Used to initialize a server without specifying a dynamic quorum. 
When setting this flag, " +
+        "the controller.quorum.voters config should not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(storeTrue())
 
     reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
-      .help("Used to initialize a server with a specific dynamic quorum 
topology. The argument " +
+      .help("Used to initialize a server with the specified dynamic quorum. 
The argument " +
         "is a comma-separated list of id@hostname:port:directory. The same 
values must be used to " +
         "format all nodes. For 
example:\[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
-        
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+        
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When 
setting this flag, " +
+        "the controller.quorum.voters config must not be set, and 
controller.quorum.bootstrap.servers is set instead.")
       .action(store())
   }
 
diff --git 
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java 
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index fa906a8ffb4..76e7cbd9261 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
             new TestKitNodes.Builder().
                 setNumBrokerNodes(1).
                 setNumControllerNodes(1).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
                 build()
-        ).build()) {
+        ).setStandalone(true).build()) {
             cluster.format();
             cluster.startup();
             try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
 
     @Test
     public void testRemoveController() throws Exception {
-        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder().
-                setNumBrokerNodes(1).
-                setNumControllerNodes(3).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
-                build()
-        ).build()) {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setInitialVoterSet(initialVoters).
+            build()
+        ) {
             cluster.format();
             cluster.startup();
             try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
 
     @Test
     public void testRemoveAndAddSameController() throws Exception {
-        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder().
-                setNumBrokerNodes(1).
-                setNumControllerNodes(4).
-                setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
-                build()).build()
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(4).
+            build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setInitialVoterSet(initialVoters).
+            build()
         ) {
             cluster.format();
             cluster.startup();
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 70e950d89dd..6bdefd471e8 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1011,8 +1011,7 @@ class KRaftClusterTest {
     val cluster = new KafkaClusterTestKit.Builder(
       new TestKitNodes.Builder().
         setNumBrokerNodes(1).
-        setNumControllerNodes(1).
-        setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+        setNumControllerNodes(1).build()).setStandalone(true).build()
     try {
       cluster.format()
       cluster.startup()
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 28b132243e7..9e0a602beef 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -375,7 +375,10 @@ Found problem:
   def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultStaticQuorumProperties)
+    properties.setProperty("process.roles", "broker")
+    properties.setProperty("node.id", "0")
+    properties.setProperty("controller.listener.names", "CONTROLLER")
+    properties.setProperty("controller.quorum.bootstrap.servers", 
"localhost:9093")
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--standalone")
@@ -458,19 +461,14 @@ Found problem:
               Seq("--release-version", "3.9-IV0"))).getMessage)
   }
 
-  @ParameterizedTest
-  @ValueSource(booleans = Array(false, true))
-  def 
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: 
Boolean): Unit = {
+  @Test
+  def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
     properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--no-initial-controllers")
-    if (setKraftVersionFeature) {
-      arguments += "--feature"
-      arguments += "kraft.version=1"
-    }
     assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
     assertTrue(stream.toString().
       contains("Formatting metadata directory %s".format(availableDirs.head)),
diff --git a/docs/ops.html b/docs/ops.html
index cbfcdd36e7a..2b87e07f553 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4089,14 +4089,8 @@ Feature: metadata.version       SupportedMinVersion: 
3.3-IV3    SupportedMaxVers
   <p>
     The static versus dynamic nature of the quorum is determined at the time 
of formatting.
     Specifically, the quorum will be formatted as dynamic if 
<code>controller.quorum.voters</code> is
-    <b>not</b> present, and if the software version is Apache Kafka 3.9 or 
newer. If you have
-    followed the instructions earlier in this document, you will get a dynamic 
quorum.<p>
-
-    If you would like the formatting process to fail if a dynamic quorum 
cannot be achieved, format your
-    controllers using the <code>--feature kraft.version=1</code>. (Note that 
you should not supply
-    this flag when formatting brokers -- only when formatting controllers.)<p>
-
-  <pre><code class="language-bash">$ bin/kafka-storage.sh format -t 
KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties</code></pre>
+    <b>not</b> present, and one of --standalone, --initial-controllers, or 
--no-initial-controllers is set.
+    If you have followed the instructions earlier in this document, you will 
get a dynamic quorum.
   <p>
   Note: To migrate from static voter set to dynamic voter set, please refer to 
the <a href="#kraft_upgrade">Upgrade</a> section.
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 04b52c9e665..97f53fc902a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -131,7 +131,7 @@ public class Formatter {
      * The initial KIP-853 voters.
      */
     private Optional<DynamicVoters> initialControllers = Optional.empty();
-    private boolean noInitialControllersFlag = false;
+    private boolean hasDynamicQuorum = false;
 
     public Formatter setPrintStream(PrintStream printStream) {
         this.printStream = printStream;
@@ -217,8 +217,8 @@ public class Formatter {
         return this;
     }
 
-    public Formatter setNoInitialControllersFlag(boolean 
noInitialControllersFlag) {
-        this.noInitialControllersFlag = noInitialControllersFlag;
+    public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
+        this.hasDynamicQuorum = hasDynamicQuorum;
         return this;
     }
 
@@ -227,7 +227,7 @@ public class Formatter {
     }
 
     boolean hasDynamicQuorum() {
-        return initialControllers.isPresent() || noInitialControllersFlag;
+        return hasDynamicQuorum;
     }
 
     public BootstrapMetadata bootstrapMetadata() {
@@ -337,8 +337,8 @@ public class Formatter {
     /**
      * Calculate the effective feature level for kraft.version. In order to 
keep existing
      * command-line invocations of StorageTool working, we default this to 0 
if no dynamic
-     * voter quorum arguments were provided. As a convenience, if dynamic 
voter quorum arguments
-     * were passed, we set the latest kraft.version. (Currently there is only 
1 non-zero version).
+     * voter quorum arguments were provided. As a convenience, if the static 
voters config is
+     * empty, we set the latest kraft.version. (Currently there is only 1 
non-zero version).
      *
      * @param configuredKRaftVersionLevel   The configured level for 
kraft.version
      * @return                              The effective feature level.
@@ -348,20 +348,19 @@ public class Formatter {
             if (configuredKRaftVersionLevel.get() == 0) {
                 if (hasDynamicQuorum()) {
                     throw new FormatterException(
-                        "Cannot set kraft.version to " +
-                        configuredKRaftVersionLevel.get() +
-                        " if one of the flags --standalone, 
--initial-controllers, or --no-initial-controllers is used. " +
-                        "For dynamic controllers support, try removing the 
--feature flag for kraft.version."
+                        "Cannot set kraft.version to 0 if 
controller.quorum.voters is empty and one of the flags " +
+                        "--standalone, --initial-controllers, or 
--no-initial-controllers is used. For dynamic " +
+                        "controllers support, try removing the --feature flag 
for kraft.version."
                     );
                 }
             } else {
                 if (!hasDynamicQuorum()) {
                     throw new FormatterException(
-                        "Cannot set kraft.version to " +
-                        configuredKRaftVersionLevel.get() +
-                        " unless one of the flags --standalone, 
--initial-controllers, or --no-initial-controllers is used. " +
-                        "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, or " +
-                        "--no-initial-controllers."
+                        "Cannot set kraft.version to " + 
configuredKRaftVersionLevel.get() +
+                        " unless controller.quorum.voters is empty and one of 
the flags --standalone, " +
+                        "--initial-controllers, or --no-initial-controllers is 
used. " +
+                        "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, " +
+                        "or --no-initial-controllers and removing 
controller.quorum.voters."
                     );
                 }
             }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 5ddcd2d8889..8a41fdd6aa9 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
 import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.TestFeatureVersion;
 import org.apache.kafka.server.common.TransactionVersion;
@@ -200,6 +201,7 @@ public class FormatterTest {
             String newDirectoryId = Uuid.randomUuid().toString();
             formatter1.formatter
                 .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" 
+ originalDirectoryId))
+                .setHasDynamicQuorum(true)
                 .run();
             assertEquals("Formatting dynamic metadata voter directory " + 
testEnv.directory(0) +
                     " with metadata.version " + 
MetadataVersion.latestProduction() + ".",
@@ -417,13 +419,14 @@ public class FormatterTest {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
             if (specifyKRaftVersion) {
-                formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
+                
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
             }
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             formatter1.formatter.run();
-            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
             assertEquals(List.of(
                 String.format("Formatting data directory %s with %s %s.",
                     testEnv.directory(1),
@@ -450,49 +453,66 @@ public class FormatterTest {
     public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 0);
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             assertTrue(formatter1.formatter.hasDynamicQuorum());
             assertEquals(
-                "Cannot set kraft.version to 0 if one of the flags 
--standalone, --initial-controllers, or " +
-                "--no-initial-controllers is used. For dynamic controllers 
support, try removing the " +
-                "--feature flag for kraft.version.",
-                assertThrows(FormatterException.class, () -> 
formatter1.formatter.run()).getMessage()
+                "Cannot set kraft.version to 0 if controller.quorum.voters is 
empty " +
+                "and one of the flags --standalone, --initial-controllers, or 
--no-initial-controllers is used. " +
+                "For dynamic controllers support, try removing the --feature 
flag for kraft.version.",
+                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
             );
         }
     }
 
     @Test
-    public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() 
throws Exception {
+    public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 1);
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
             assertFalse(formatter1.formatter.hasDynamicQuorum());
             assertEquals(
-                "Cannot set kraft.version to 1 unless one of the flags 
--standalone, --initial-controllers, or " +
-                "--no-initial-controllers is used. For dynamic controllers 
support, try using one of " +
-                "--standalone, --initial-controllers, or 
--no-initial-controllers.",
-                assertThrows(FormatterException.class, () -> 
formatter1.formatter.run()).getMessage()
+                "Cannot set kraft.version to 1 unless controller.quorum.voters 
is empty and " +
+                "one of the flags --standalone, --initial-controllers, or 
--no-initial-controllers is used. " +
+                "For dynamic controllers support, try using one of 
--standalone, --initial-controllers, " +
+                "or --no-initial-controllers and removing 
controller.quorum.voters.",
+                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
             );
         }
     }
 
     @Test
-    public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() 
throws Exception {
+    public void testFormatWithInitialVotersWithOlderMetadataVersion() throws 
Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
             
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+            formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, 
(short) 1);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                     parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            assertEquals("kraft.version could not be set to 1 because it 
depends on " +
-                "metadata.version level 21",
-                    assertThrows(IllegalArgumentException.class,
-                        () -> formatter1.formatter.run()).getMessage());
+            formatter1.formatter.setHasDynamicQuorum(true);
+            formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void 
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean 
hasDynamicQuorum) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+            formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
+            formatter1.formatter.run();
+            if (hasDynamicQuorum) {
+                assertEquals((short) 1, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            } else {
+                assertEquals((short) 0, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+            }
         }
     }
 
@@ -513,6 +533,7 @@ public class FormatterTest {
             
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
 (short) 1);
             formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+            formatter1.formatter.setHasDynamicQuorum(true);
             if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
                 assertDoesNotThrow(() -> formatter1.formatter.run());
             } else {
@@ -524,20 +545,14 @@ public class FormatterTest {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testFormatWithNoInitialControllers(boolean 
specifyKRaftVersion) throws Exception {
+    @Test
+    public void testFormatWithNoInitialControllers() throws Exception {
         try (TestEnv testEnv = new TestEnv(2)) {
             FormatterContext formatter1 = testEnv.newFormatter();
-            if (specifyKRaftVersion) {
-                formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
-            }
             formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(true);
-            assertTrue(formatter1.formatter.hasDynamicQuorum());
-
+            assertFalse(formatter1.formatter.hasDynamicQuorum());
             formatter1.formatter.run();
-            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+            assertEquals((short) 0, 
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
             assertEquals(List.of(
                     String.format("Formatting data directory %s with %s %s.",
                         testEnv.directory(1),
@@ -557,38 +572,4 @@ public class FormatterTest {
             assertNotNull(logDirProps1);
         }
     }
-
-    @Test
-    public void 
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws 
Exception {
-        try (TestEnv testEnv = new TestEnv(2)) {
-            FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
-            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(false);
-            assertFalse(formatter1.formatter.hasDynamicQuorum());
-            assertEquals(
-                "Cannot set kraft.version to 1 unless one of the flags 
--standalone, --initial-controllers, or " +
-                "--no-initial-controllers is used. For dynamic controllers 
support, try using one of " +
-                "--standalone, --initial-controllers, or 
--no-initial-controllers.",
-                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
-            );
-        }
-    }
-
-    @Test
-    public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() 
throws Exception {
-        try (TestEnv testEnv = new TestEnv(2)) {
-            FormatterContext formatter1 = testEnv.newFormatter();
-            formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
-            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
-            formatter1.formatter.setNoInitialControllersFlag(true);
-            assertTrue(formatter1.formatter.hasDynamicQuorum());
-            assertEquals(
-                "Cannot set kraft.version to 0 if one of the flags 
--standalone, --initial-controllers, or " +
-                "--no-initial-controllers is used. For dynamic controllers 
support, try removing the " +
-                "--feature flag for kraft.version.",
-                assertThrows(FormatterException.class, 
formatter1.formatter::run).getMessage()
-            );
-        }
-    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index 463cc2a015c..d797880c776 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -72,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
 
     @Override
     public Map<String, Short> dependencies() {
-        if (this.featureLevel == 0) {
-            return Map.of();
-        } else {
-            return Map.of(
-                MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_9_IV0.featureLevel());
-        }
+        return Map.of();
     }
 
     public boolean isAtLeast(KRaftVersion otherVersion) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 3ff43a3a2ce..7fe40876c51 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,10 +27,12 @@ import kafka.server.SharedServer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.common.utils.ThreadUtils;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -43,7 +45,6 @@ import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.MetadataLogConfig;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.config.KRaftConfigs;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.fault.FaultHandler;
@@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         private final String controllerListenerName;
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
+        private boolean standalone;
+        private Optional<Map<Integer, Uuid>> initialVoterSet = 
Optional.empty();
         private boolean deleteOnClose;
 
         public Builder(TestKitNodes nodes) {
@@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
+        public Builder setStandalone(boolean standalone) {
+            this.standalone = standalone;
+            return this;
+        }
+
+        public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
+            this.initialVoterSet = Optional.of(initialVoterSet);
+            return this;
+        }
+
         private KafkaConfig createNodeConfig(TestKitNode node) throws 
IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
@@ -168,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, 
brokerListenerName);
             props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
controllerListenerName);
 
-            StringBuilder quorumVoterStringBuilder = new StringBuilder();
-            String prefix = "";
-            for (int nodeId : nodes.controllerNodes().keySet()) {
-                quorumVoterStringBuilder.append(prefix).
-                    append(nodeId).
-                    append("@").
-                    append("localhost").
-                    append(":").
-                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
-                prefix = ",";
+            if (!standalone && initialVoterSet.isEmpty()) {
+                StringBuilder quorumVoterStringBuilder = new StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    quorumVoterStringBuilder.append(prefix).
+                        append(nodeId).
+                        append("@").
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
+            } else {
+                StringBuilder bootstrapServersStringBuilder = new 
StringBuilder();
+                String prefix = "";
+                for (int nodeId : nodes.controllerNodes().keySet()) {
+                    bootstrapServersStringBuilder.append(prefix).
+                        append("localhost").
+                        append(":").
+                        
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
+                    prefix = ",";
+                }
+                props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersStringBuilder.toString());
             }
-            props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
 
             // reduce log cleaner offset map memory usage
             
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -258,7 +284,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         Time.SYSTEM,
                         new Metrics(),
                         
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
-                        List.of(),
+                        
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
                         faultHandlerFactory,
                         
socketFactoryManager.getOrCreateSocketFactory(node.id())
                     );
@@ -286,7 +312,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                             Time.SYSTEM,
                             new Metrics(),
                             
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
-                            List.of(),
+                            
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
                             faultHandlerFactory,
                             
socketFactoryManager.getOrCreateSocketFactory(node.id())
                         );
@@ -323,6 +349,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     faultHandlerFactory,
                     socketFactoryManager,
                     jaasFile,
+                    standalone,
+                    initialVoterSet,
                     deleteOnClose);
         }
 
@@ -368,6 +396,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final PreboundSocketFactoryManager socketFactoryManager;
     private final String controllerListenerName;
     private final Optional<File> jaasFile;
+    private final boolean standalone;
+    private final Optional<Map<Integer, Uuid>> initialVoterSet;
     private final boolean deleteOnClose;
 
     private KafkaClusterTestKit(
@@ -378,6 +408,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         SimpleFaultHandlerFactory faultHandlerFactory,
         PreboundSocketFactoryManager socketFactoryManager,
         Optional<File> jaasFile,
+        boolean standalone,
+        Optional<Map<Integer, Uuid>> initialVoterSet,
         boolean deleteOnClose
     ) {
         /*
@@ -395,6 +427,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.socketFactoryManager = socketFactoryManager;
         this.controllerListenerName = nodes.controllerListenerName().value();
         this.jaasFile = jaasFile;
+        this.standalone = standalone;
+        this.initialVoterSet = initialVoterSet;
         this.deleteOnClose = deleteOnClose;
     }
 
@@ -425,8 +459,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
         boolean writeMetadataDirectory
     ) {
         try {
+            final var nodeId = ensemble.nodeId().getAsInt();
             Formatter formatter = new Formatter();
-            formatter.setNodeId(ensemble.nodeId().getAsInt());
+            formatter.setNodeId(nodeId);
             formatter.setClusterId(ensemble.clusterId().get());
             if (writeMetadataDirectory) {
                 formatter.setDirectories(ensemble.logDirProps().keySet());
@@ -439,8 +474,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 return;
             }
             
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
-            formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
-                
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
             formatter.setUnstableFeatureVersionsEnabled(true);
             formatter.setIgnoreFormatted(false);
             formatter.setControllerListenerName(controllerListenerName);
@@ -449,18 +482,43 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             } else {
                 formatter.setMetadataLogDirectory(Optional.empty());
             }
-            if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
-                StringBuilder dynamicVotersBuilder = new StringBuilder();
-                String prefix = "";
-                for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
-                    int port = socketFactoryManager.
-                        getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
+            StringBuilder dynamicVotersBuilder = new StringBuilder();
+            String prefix = "";
+            if (standalone) {
+                if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + 
TestKitDefaults.CONTROLLER_ID_OFFSET) {
+                    final var controllerNode = 
nodes.controllerNodes().get(nodeId);
+                    dynamicVotersBuilder.append(
+                        String.format(
+                            "%d@localhost:%d:%s",
+                            controllerNode.id(),
+                            socketFactoryManager.
+                                
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+                            controllerNode.metadataDirectoryId()
+                        )
+                    );
+                    
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                }
+                // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, 
the node is formatting with
+                // the --no-initial-controllers flag
+                formatter.setHasDynamicQuorum(true);
+            } else if (initialVoterSet.isPresent()) {
+                for (final var controllerNode : 
initialVoterSet.get().entrySet()) {
+                    final var voterId = controllerNode.getKey();
+                    final var voterDirectoryId = controllerNode.getValue();
                     dynamicVotersBuilder.append(prefix);
                     prefix = ",";
-                    
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
-                        controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
+                    dynamicVotersBuilder.append(
+                        String.format(
+                            "%d@localhost:%d:%s",
+                            voterId,
+                            socketFactoryManager.
+                                getOrCreatePortForListener(voterId, 
controllerListenerName),
+                            voterDirectoryId
+                        )
+                    );
                 }
                 
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                formatter.setHasDynamicQuorum(true);
             }
             formatter.run();
         } catch (Exception e) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index a9667dbd631..3622430f487 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -93,11 +93,6 @@ public class TestKitNodes {
             return this;
         }
 
-        public Builder setFeature(String featureName, short level) {
-            this.bootstrapMetadata = 
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
-            return this;
-        }
-
         public Builder setCombined(boolean combined) {
             this.combined = combined;
             return this;


Reply via email to