This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98d47f47ef2 KAFKA-18028 the effective kraft version of
--no-initial-controllers should be 1 rather than 0 (#17836)
98d47f47ef2 is described below
commit 98d47f47ef21394779ef2ff2675c6ee54d5a5dc2
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Nov 27 01:45:11 2024 +0800
KAFKA-18028 the effective kraft version of --no-initial-controllers should
be 1 rather than 0 (#17836)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 14 ++---
.../scala/unit/kafka/tools/StorageToolTest.scala | 36 ++++++++++--
.../apache/kafka/metadata/storage/Formatter.java | 12 +++-
.../kafka/metadata/storage/FormatterTest.java | 65 ++++++++++++++++++++++
4 files changed, 114 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 29734bd8d8d..b3ff5321625 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -143,14 +143,14 @@ object StorageTool extends Logging {
if (namespace.getBoolean("standalone")) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
- if (!namespace.getBoolean("no_initial_controllers")) {
+ if (namespace.getBoolean("no_initial_controllers")) {
+ formatter.setNoInitialControllersFlag(true)
+ } else {
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
- if (config.quorumConfig.voters().isEmpty) {
- if (formatter.initialVoters().isEmpty()) {
- throw new TerseFailure("Because " +
QuorumConfig.QUORUM_VOTERS_CONFIG +
- " is not set on this controller, you must specify one of the
following: " +
- "--standalone, --initial-controllers, or
--no-initial-controllers.");
- }
+ if (config.quorumConfig.voters().isEmpty &&
formatter.initialVoters().isEmpty) {
+ throw new TerseFailure("Because " +
QuorumConfig.QUORUM_VOTERS_CONFIG +
+ " is not set on this controller, you must specify one of the
following: " +
+ "--standalone, --initial-controllers, or
--no-initial-controllers.");
}
}
}
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index beff77cf523..5a213e6c186 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -483,20 +483,48 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage)
}
- @Test
- def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature:
Boolean): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
- assertEquals(0, runFormatCommand(stream, properties,
- Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
+ val arguments = ListBuffer[String]("--release-version", "3.9-IV0",
"--no-initial-controllers")
+ if (setKraftVersionFeature) {
+ arguments += "--feature"
+ arguments += "kraft.version=1"
+ }
+ assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
"Failed to find content in output: " + stream.toString())
}
+ @Test
+ def testFormatWithNoInitialControllersFlagAndStandaloneFlagFails(): Unit = {
+ val arguments = ListBuffer[String](
+ "format", "--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ",
+ "--release-version", "3.9-IV0",
+ "--no-initial-controllers", "--standalone")
+ val exception = assertThrows(classOf[ArgumentParserException], () =>
StorageTool.parseArguments(arguments.toArray))
+ assertEquals("argument --standalone/-s: not allowed with argument
--no-initial-controllers/-N", exception.getMessage)
+ }
+
+ @Test
+ def testFormatWithNoInitialControllersFlagAndInitialControllersFlagFails():
Unit = {
+ val arguments = ListBuffer[String](
+ "format", "--cluster-id", "XcZZOzUqS4yHOjhMQB6JLQ",
+ "--release-version", "3.9-IV0",
+ "--no-initial-controllers", "--initial-controllers",
+ "0@localhost:8020:K90IZ-0DRNazJ49kCZ1EMQ," +
+ "1@localhost:8030:aUARLskQTCW4qCZDtS_cwA," +
+ "2@localhost:8040:2ggvsS4kQb-fSJ_-zC_Ang")
+ val exception = assertThrows(classOf[ArgumentParserException], () =>
StorageTool.parseArguments(arguments.toArray))
+ assertEquals("argument --initial-controllers/-I: not allowed with argument
--no-initial-controllers/-N", exception.getMessage)
+ }
+
@Test
def
testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit
= {
val availableDirs = Seq(TestUtils.tempDir())
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 53013307149..d512545384a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -93,8 +93,10 @@ public class Formatter {
/**
* Maps feature names to the level they will start off with.
+ *
+ * Visible for testing.
*/
- private Map<String, Short> featureLevels = new TreeMap<>();
+ protected Map<String, Short> featureLevels = new TreeMap<>();
/**
* The bootstrap metadata used to format the cluster.
@@ -130,6 +132,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
+ private boolean noInitialControllersFlag = false;
public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
@@ -215,12 +218,17 @@ public class Formatter {
return this;
}
+ public Formatter setNoInitialControllersFlag(boolean
noInitialControllersFlag) {
+ this.noInitialControllersFlag = noInitialControllersFlag;
+ return this;
+ }
+
public Optional<DynamicVoters> initialVoters() {
return initialControllers;
}
boolean hasDynamicQuorum() {
- return initialControllers.isPresent();
+ return initialControllers.isPresent() || noInitialControllersFlag;
}
public BootstrapMetadata bootstrapMetadata() {
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 45a896c47c4..c0d9cd4ee95 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -379,6 +379,7 @@ public class FormatterTest {
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
assertEquals(Arrays.asList(
String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1),
@@ -446,4 +447,68 @@ public class FormatterTest {
() -> formatter1.formatter.run()).getMessage());
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testFormatWithNoInitialControllers(boolean
specifyKRaftVersion) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+ if (specifyKRaftVersion) {
+ formatter1.formatter.setFeatureLevel("kraft.version", (short)
1);
+ }
+ formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+ formatter1.formatter.setNoInitialControllersFlag(true);
+ assertTrue(formatter1.formatter.hasDynamicQuorum());
+
+ formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ assertEquals(Arrays.asList(
+ String.format("Formatting data directory %s with %s %s.",
+ testEnv.directory(1),
+ MetadataVersion.FEATURE_NAME,
+ MetadataVersion.latestTesting()),
+ String.format("Formatting metadata directory %s with %s
%s.",
+ testEnv.directory(0),
+ MetadataVersion.FEATURE_NAME,
+ MetadataVersion.latestTesting())),
+
formatter1.outputLines().stream().sorted().collect(Collectors.toList()));
+ MetaPropertiesEnsemble ensemble = new
MetaPropertiesEnsemble.Loader().
+ addLogDirs(testEnv.directories).
+ load();
+ MetaProperties logDirProps0 =
ensemble.logDirProps().get(testEnv.directory(0));
+ assertNotNull(logDirProps0);
+ MetaProperties logDirProps1 =
ensemble.logDirProps().get(testEnv.directory(1));
+ assertNotNull(logDirProps1);
+ }
+ }
+
+ @Test
+ public void
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws
Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+ formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+ formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+ formatter1.formatter.setNoInitialControllersFlag(false);
+ assertFalse(formatter1.formatter.hasDynamicQuorum());
+ assertEquals("Cannot set kraft.version to 1 unless KIP-853
configuration is present. " +
+ "Try removing the --feature flag for kraft.version.",
+ assertThrows(FormatterException.class,
+ formatter1.formatter::run).getMessage());
+ }
+ }
+
+ @Test
+ public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion()
throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+ formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+ formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
+ formatter1.formatter.setNoInitialControllersFlag(true);
+ assertTrue(formatter1.formatter.hasDynamicQuorum());
+ assertEquals("Cannot set kraft.version to 0 if KIP-853
configuration is present. " +
+ "Try removing the --feature flag for kraft.version.",
+ assertThrows(FormatterException.class,
+ formatter1.formatter::run).getMessage());
+ }
+ }
}