This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 98d47f47ef2 KAFKA-18028 the effective kraft version of 
--no-initial-controllers should be 1 rather than 0 (#17836)
98d47f47ef2 is described below

commit 98d47f47ef21394779ef2ff2675c6ee54d5a5dc2
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Nov 27 01:45:11 2024 +0800

    KAFKA-18028 the effective kraft version of --no-initial-controllers should 
be 1 rather than 0 (#17836)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/tools/StorageTool.scala  | 14 ++---
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 36 ++++++++++--
 .../apache/kafka/metadata/storage/Formatter.java   | 12 +++-
 .../kafka/metadata/storage/FormatterTest.java      | 65 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 29734bd8d8d..b3ff5321625 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -143,14 +143,14 @@ object StorageTool extends Logging {
     if (namespace.getBoolean("standalone")) {
       formatter.setInitialControllers(createStandaloneDynamicVoters(config))
     }
-    if (!namespace.getBoolean("no_initial_controllers")) {
+    if (namespace.getBoolean("no_initial_controllers")) {
+      formatter.setNoInitialControllersFlag(true)
+    } else {
       if (config.processRoles.contains(ProcessRole.ControllerRole)) {
-        if (config.quorumConfig.voters().isEmpty) {
-          if (formatter.initialVoters().isEmpty()) {
-            throw new TerseFailure("Because " + 
QuorumConfig.QUORUM_VOTERS_CONFIG +
-              " is not set on this controller, you must specify one of the 
following: " +
-              "--standalone, --initial-controllers, or 
--no-initial-controllers.");
-          }
+        if (config.quorumConfig.voters().isEmpty && 
formatter.initialVoters().isEmpty) {
+          throw new TerseFailure("Because " + 
QuorumConfig.QUORUM_VOTERS_CONFIG +
+            " is not set on this controller, you must specify one of the 
following: " +
+            "--standalone, --initial-controllers, or 
--no-initial-controllers.");
         }
       }
     }
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index beff77cf523..5a213e6c186 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -483,20 +483,48 @@ Found problem:
               Seq("--release-version", "3.9-IV0"))).getMessage)
   }
 
-  @Test
-  def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def 
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: 
Boolean): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
     properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
-    assertEquals(0, runFormatCommand(stream, properties,
-      Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
+    val arguments = ListBuffer[String]("--release-version", "3.9-IV0", 
"--no-initial-controllers")
+    if (setKraftVersionFeature) {
+      arguments += "--feature"
+      arguments += "kraft.version=1"
+    }
+    assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
     assertTrue(stream.toString().
       contains("Formatting metadata directory %s".format(availableDirs.head)),
       "Failed to find content in output: " + stream.toString())
   }
 
+  @Test
+  def testFormatWithNoInitialControllersFlagAndStandaloneFlagFails(): Unit = {
+    val arguments = ListBuffer[String](
+      "format", "--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", "3.9-IV0",
+      "--no-initial-controllers", "--standalone")
+    val exception = assertThrows(classOf[ArgumentParserException], () => 
StorageTool.parseArguments(arguments.toArray))
+    assertEquals("argument --standalone/-s: not allowed with argument 
--no-initial-controllers/-N", exception.getMessage)
+  }
+
+  @Test
+  def testFormatWithNoInitialControllersFlagAndInitialControllersFlagFails(): 
Unit = {
+    val arguments = ListBuffer[String](
+      "format", "--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", "3.9-IV0",
+      "--no-initial-controllers", "--initial-controllers",
+      "0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," +
+      "1@localhost:8030:aUARLskQTCW4qCZDtS_cwA," +
+      "2@localhost:8040:2ggvsS4kQb-fSJ_-zC_Ang")
+    val exception = assertThrows(classOf[ArgumentParserException], () => 
StorageTool.parseArguments(arguments.toArray))
+    assertEquals("argument --initial-controllers/-I: not allowed with argument 
--no-initial-controllers/-N", exception.getMessage)
+  }
+
   @Test
   def 
testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit 
= {
     val availableDirs = Seq(TestUtils.tempDir())
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 53013307149..d512545384a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -93,8 +93,10 @@ public class Formatter {
 
     /**
      * Maps feature names to the level they will start off with.
+     *
+     * Visible for testing.
      */
-    private Map<String, Short> featureLevels = new TreeMap<>();
+    protected Map<String, Short> featureLevels = new TreeMap<>();
 
     /**
      * The bootstrap metadata used to format the cluster.
@@ -130,6 +132,7 @@ public class Formatter {
      * The initial KIP-853 voters.
      */
     private Optional<DynamicVoters> initialControllers = Optional.empty();
+    private boolean noInitialControllersFlag = false;
 
     public Formatter setPrintStream(PrintStream printStream) {
         this.printStream = printStream;
@@ -215,12 +218,17 @@ public class Formatter {
         return this;
     }
 
+    public Formatter setNoInitialControllersFlag(boolean 
noInitialControllersFlag) {
+        this.noInitialControllersFlag = noInitialControllersFlag;
+        return this;
+    }
+
     public Optional<DynamicVoters> initialVoters() {
         return initialControllers;
     }
 
     boolean hasDynamicQuorum() {
-        return initialControllers.isPresent();
+        return initialControllers.isPresent() || noInitialControllersFlag;
     }
 
     public BootstrapMetadata bootstrapMetadata() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 45a896c47c4..c0d9cd4ee95 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -379,6 +379,7 @@ public class FormatterTest {
             formatter1.formatter.setInitialControllers(DynamicVoters.
                 parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
             formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
             assertEquals(Arrays.asList(
                 String.format("Formatting data directory %s with %s %s.",
                     testEnv.directory(1),
@@ -446,4 +447,68 @@ public class FormatterTest {
                         () -> formatter1.formatter.run()).getMessage());
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testFormatWithNoInitialControllers(boolean 
specifyKRaftVersion) throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            if (specifyKRaftVersion) {
+                formatter1.formatter.setFeatureLevel("kraft.version", (short) 
1);
+            }
+            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+            formatter1.formatter.setNoInitialControllersFlag(true);
+            assertTrue(formatter1.formatter.hasDynamicQuorum());
+
+            formatter1.formatter.run();
+            assertEquals((short) 1, 
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+            assertEquals(Arrays.asList(
+                    String.format("Formatting data directory %s with %s %s.",
+                        testEnv.directory(1),
+                        MetadataVersion.FEATURE_NAME,
+                        MetadataVersion.latestTesting()),
+                    String.format("Formatting metadata directory %s with %s 
%s.",
+                        testEnv.directory(0),
+                        MetadataVersion.FEATURE_NAME,
+                        MetadataVersion.latestTesting())),
+                
formatter1.outputLines().stream().sorted().collect(Collectors.toList()));
+            MetaPropertiesEnsemble ensemble = new 
MetaPropertiesEnsemble.Loader().
+                addLogDirs(testEnv.directories).
+                load();
+            MetaProperties logDirProps0 = 
ensemble.logDirProps().get(testEnv.directory(0));
+            assertNotNull(logDirProps0);
+            MetaProperties logDirProps1 = 
ensemble.logDirProps().get(testEnv.directory(1));
+            assertNotNull(logDirProps1);
+        }
+    }
+
+    @Test
+    public void 
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws 
Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+            formatter1.formatter.setNoInitialControllersFlag(false);
+            assertFalse(formatter1.formatter.hasDynamicQuorum());
+            assertEquals("Cannot set kraft.version to 1 unless KIP-853 
configuration is present. " +
+                    "Try removing the --feature flag for kraft.version.",
+                assertThrows(FormatterException.class,
+                    formatter1.formatter::run).getMessage());
+        }
+    }
+
+    @Test
+    public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() 
throws Exception {
+        try (TestEnv testEnv = new TestEnv(2)) {
+            FormatterContext formatter1 = testEnv.newFormatter();
+            formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+            formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+            formatter1.formatter.setNoInitialControllersFlag(true);
+            assertTrue(formatter1.formatter.hasDynamicQuorum());
+            assertEquals("Cannot set kraft.version to 0 if KIP-853 
configuration is present. " +
+                    "Try removing the --feature flag for kraft.version.",
+                assertThrows(FormatterException.class,
+                    formatter1.formatter::run).getMessage());
+        }
+    }
 }

Reply via email to