This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 099e91f5fc7 KAFKA-19719 --no-initial-controllers should not assume
kraft.version=… (#20616)
099e91f5fc7 is described below
commit 099e91f5fc7e0a44ffec05d60cba650ceea4109a
Author: Kevin Wu <[email protected]>
AuthorDate: Mon Oct 6 09:42:35 2025 -0500
KAFKA-19719 --no-initial-controllers should not assume kraft.version=…
(#20616)
backport KAFKA-19719 to 4.0
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/main/scala/kafka/tools/StorageTool.scala | 38 ++++--
.../ReconfigurableQuorumIntegrationTest.java | 50 ++++---
.../kafka/server/KRaftClusterTest.scala | 3 +-
.../scala/unit/kafka/tools/StorageToolTest.scala | 14 +-
docs/ops.html | 62 ++++-----
.../apache/kafka/metadata/storage/Formatter.java | 30 +++--
.../kafka/metadata/storage/FormatterTest.java | 144 ++++++++++++---------
.../apache/kafka/server/common/KRaftVersion.java | 8 +-
.../kafka/common/test/KafkaClusterTestKit.java | 108 ++++++++++++----
.../org/apache/kafka/common/test/TestKitNodes.java | 5 -
10 files changed, 275 insertions(+), 187 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 40892bca38c..594109e15d2 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -135,21 +135,30 @@ object StorageTool extends Logging {
featureNamesAndLevels(_).foreachEntry {
(k, v) => formatter.setFeatureLevel(k, v)
})
- Option(namespace.getString("initial_controllers")).
+ val initialControllers = namespace.getString("initial_controllers")
+ val isStandalone = namespace.getBoolean("standalone")
+ val staticVotersEmpty = config.quorumConfig.voters().isEmpty
+ formatter.setHasDynamicQuorum(staticVotersEmpty)
+ if (!staticVotersEmpty && (Option(initialControllers).isDefined ||
isStandalone)) {
+ throw new TerseFailure("You cannot specify " +
+ QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
+ "with --initial-controllers or --standalone. " +
+ "If you want to use dynamic quorum, please remove " +
+ QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
+ QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
+ }
+ Option(initialControllers).
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
- if (namespace.getBoolean("standalone")) {
+ if (isStandalone) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
- if (namespace.getBoolean("no_initial_controllers")) {
- formatter.setNoInitialControllersFlag(true)
- } else {
- if (config.processRoles.contains(ProcessRole.ControllerRole)) {
- if (config.quorumConfig.voters().isEmpty &&
formatter.initialVoters().isEmpty) {
+ if (!namespace.getBoolean("no_initial_controllers") &&
+ config.processRoles.contains(ProcessRole.ControllerRole) &&
+ staticVotersEmpty &&
+ formatter.initialVoters().isEmpty) {
throw new TerseFailure("Because " +
QuorumConfig.QUORUM_VOTERS_CONFIG +
" is not set on this controller, you must specify one of the
following: " +
"--standalone, --initial-controllers, or
--no-initial-controllers.");
- }
- }
}
Option(namespace.getList("add_scram")).
foreach(scramArgs =>
formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -319,18 +328,21 @@ object StorageTool extends Logging {
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
- .help("Used to initialize a controller as a single-node dynamic quorum.")
+ .help("Used to initialize a controller as a single-node dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
- .help("Used to initialize a server without a dynamic quorum topology.")
+ .help("Used to initialize a server without specifying a dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config should not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
- .help("Used to initialize a server with a specific dynamic quorum
topology. The argument " +
+ .help("Used to initialize a server with the specified dynamic quorum.
The argument " +
"is a comma-separated list of id@hostname:port:directory. The same
values must be used to " +
"format all nodes. For
example:\[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
-
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
+
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When
setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(store())
}
diff --git
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
index 981217ce287..fe1b6b5da9f 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;
@@ -83,9 +84,8 @@ public class ReconfigurableQuorumIntegrationTest {
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build()
- ).build()) {
+ ).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -107,13 +107,23 @@ public class ReconfigurableQuorumIntegrationTest {
@Test
public void testRemoveController() throws Exception {
- try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(3).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
- build()
- ).build()) {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -132,12 +142,22 @@ public class ReconfigurableQuorumIntegrationTest {
@Test
public void testRemoveAndAddSameController() throws Exception {
- try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
- new TestKitNodes.Builder().
- setNumBrokerNodes(1).
- setNumControllerNodes(4).
- setFeature(KRaftVersion.FEATURE_NAME,
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
- build()).build()
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(4).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setInitialVoterSet(initialVoters).
+ build()
) {
cluster.format();
cluster.startup();
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 4fe4fb48cd8..2b99be9321f 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -1013,8 +1013,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
- setNumControllerNodes(1).
- setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
+ setNumControllerNodes(1).build()).setStandalone(true).build()
try {
cluster.format()
cluster.startup()
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 9fde243ec19..84404507093 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -375,7 +375,10 @@ Found problem:
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
- properties.putAll(defaultStaticQuorumProperties)
+ properties.setProperty("process.roles", "broker")
+ properties.setProperty("node.id", "0")
+ properties.setProperty("controller.listener.names", "CONTROLLER")
+ properties.setProperty("controller.quorum.bootstrap.servers",
"localhost:9093")
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0",
"--standalone")
@@ -458,19 +461,14 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage)
}
- @ParameterizedTest
- @ValueSource(booleans = Array(false, true))
- def
testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature:
Boolean): Unit = {
+ @Test
+ def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0",
"--no-initial-controllers")
- if (setKraftVersionFeature) {
- arguments += "--feature"
- arguments += "kraft.version=1"
- }
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
diff --git a/docs/ops.html b/docs/ops.html
index 5a60a4cde89..3d1c99e8522 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -3869,45 +3869,29 @@ In the replica description
0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the
If you are not sure whether you are using static or dynamic quorums, you can
determine this by
running something like the following:<p>
-<pre><code class="language-bash">
- $ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
-</code></pre><p>
-
- If the <code>kraft.version</code> field is level 0 or absent, you are using
a static quorum. If
- it is 1 or above, you are using a dynamic quorum. For example, here is an
example of a static
- quorum:<p/>
-<pre><code class="language-bash">
-Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1
FinalizedVersionLevel: 0 Epoch: 5
-Feature: metadata.version SupportedMinVersion: 3.3-IV3
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
-</code></pre><p/>
-
- Here is another example of a static quorum:<p/>
-<pre><code class="language-bash">
-Feature: metadata.version SupportedMinVersion: 3.3-IV3
SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
-</code></pre><p/>
-
- Here is an example of a dynamic quorum:<p/>
-<pre><code class="language-bash">
-Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1
FinalizedVersionLevel: 1 Epoch: 5
-Feature: metadata.version SupportedMinVersion: 3.3-IV3
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
-</code></pre><p/>
-
- The static versus dynamic nature of the quorum is determined at the time of
formatting.
- Specifically, the quorum will be formatted as dynamic if
<code>controller.quorum.voters</code> is
- <b>not</b> present, and if the software version is Apache Kafka 3.9 or
newer. If you have
- followed the instructions earlier in this document, you will get a dynamic
quorum.<p>
-
- If you would like the formatting process to fail if a dynamic quorum cannot
be achieved, format your
- controllers using the <code>--feature kraft.version=1</code>. (Note that you
should not supply
- this flag when formatting brokers -- only when formatting controllers.)<p>
-
-<pre><code class="language-bash">
- $ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1
-c controller_static.properties
- Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try
removing the --feature flag for kraft.version.
-</code></pre><p>
-
- Note: Currently it is <b>not</b> possible to convert clusters using a static
controller quorum to
- use a dynamic controller quorum. This function will be supported in the
future release.
+ <pre><code class="language-bash">$ bin/kafka-features.sh
--bootstrap-controller localhost:9093 describe</code></pre>
+ <p>
+ If the <code>kraft.version</code> field is level 0 or absent, you are
using a static quorum. If
+ it is 1 or above, you are using a dynamic quorum. For example, here is an
example of a static
+ quorum:<p>
+ <pre><code class="language-bash">Feature: kraft.version
SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch:
5
+Feature: metadata.version SupportedMinVersion: 3.3-IV3
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch:
5</code></pre>
+ <p>
+ Here is another example of a static quorum:<p>
+ <pre><code class="language-bash">Feature: metadata.version
SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0
FinalizedVersionLevel: 3.8-IV0 Epoch: 5</code></pre>
+ <p>
+ Here is an example of a dynamic quorum:<p>
+ <pre><code class="language-bash">Feature: kraft.version
SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch:
5
+Feature: metadata.version SupportedMinVersion: 3.3-IV3
SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch:
5</code></pre>
+ <p>
+ The static versus dynamic nature of the quorum is determined at the time
of formatting.
+ Specifically, the quorum will be formatted as dynamic if
<code>controller.quorum.voters</code> is
+ <b>not</b> present, and one of --standalone, --initial-controllers, or
--no-initial-controllers is set.
+ If you have followed the instructions earlier in this document, you will
get a dynamic quorum.
+ <p>
+
+ Note: Currently it is <b>not</b> possible to convert clusters using a
static controller quorum to
+ use a dynamic controller quorum. This function will be supported in the
future release.
<h5 class="anchor-heading"><a id="kraft_reconfig_add"
class="anchor-link"></a><a href="#kraft_reconfig_add">Add New
Controller</a></h5>
If a dynamic controller cluster already exists, it can be expanded by first
provisioning a new controller using the <a
href="#kraft_storage_observers">kafka-storage.sh tool</a> and starting the
controller.
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index acba0f7a04b..e79ca41ff80 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -132,7 +132,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
- private boolean noInitialControllersFlag = false;
+ private boolean hasDynamicQuorum = false;
public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
@@ -218,8 +218,8 @@ public class Formatter {
return this;
}
- public Formatter setNoInitialControllersFlag(boolean
noInitialControllersFlag) {
- this.noInitialControllersFlag = noInitialControllersFlag;
+ public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
+ this.hasDynamicQuorum = hasDynamicQuorum;
return this;
}
@@ -228,7 +228,7 @@ public class Formatter {
}
boolean hasDynamicQuorum() {
- return initialControllers.isPresent() || noInitialControllersFlag;
+ return hasDynamicQuorum;
}
public BootstrapMetadata bootstrapMetadata() {
@@ -338,8 +338,8 @@ public class Formatter {
/**
* Calculate the effective feature level for kraft.version. In order to
keep existing
* command-line invocations of StorageTool working, we default this to 0
if no dynamic
- * voter quorum arguments were provided. As a convenience, if dynamic
voter quorum arguments
- * were passed, we set the latest kraft.version. (Currently there is only
1 non-zero version).
+ * voter quorum arguments were provided. As a convenience, if the static
voters config is
+ * empty, we set the latest kraft.version. (Currently there is only 1
non-zero version).
*
* @param configuredKRaftVersionLevel The configured level for
kraft.version
* @return The effective feature level.
@@ -348,15 +348,21 @@ public class Formatter {
if (configuredKRaftVersionLevel.isPresent()) {
if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) {
- throw new FormatterException("Cannot set kraft.version to
" +
- configuredKRaftVersionLevel.get() + " if KIP-853
configuration is present. " +
- "Try removing the --feature flag for
kraft.version.");
+ throw new FormatterException(
+ "Cannot set kraft.version to 0 if
controller.quorum.voters is empty and one of the flags " +
+ "--standalone, --initial-controllers, or
--no-initial-controllers is used. For dynamic " +
+ "controllers support, try removing the --feature flag
for kraft.version."
+ );
}
} else {
if (!hasDynamicQuorum()) {
- throw new FormatterException("Cannot set kraft.version to
" +
- configuredKRaftVersionLevel.get() + " unless KIP-853
configuration is present. " +
- "Try removing the --feature flag for
kraft.version.");
+ throw new FormatterException(
+ "Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
+ " unless controller.quorum.voters is empty and one of
the flags --standalone, " +
+ "--initial-controllers, or --no-initial-controllers is
used. " +
+ "For dynamic controllers support, try using one of
--standalone, --initial-controllers, " +
+ "or --no-initial-controllers and removing
controller.quorum.voters."
+ );
}
}
return configuredKRaftVersionLevel.get();
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index 0706e4e738f..83946c426fc 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.GroupVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TestFeatureVersion;
import org.apache.kafka.server.common.TransactionVersion;
@@ -194,6 +195,40 @@ public class FormatterTest {
}
}
+ @Test
+ public void testStandaloneWithIgnoreFormatted() throws Exception {
+ try (TestEnv testEnv = new TestEnv(1)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+ String originalDirectoryId = Uuid.randomUuid().toString();
+ String newDirectoryId = Uuid.randomUuid().toString();
+ formatter1.formatter
+ .setInitialControllers(DynamicVoters.parse("1@localhost:8020:"
+ originalDirectoryId))
+ .setHasDynamicQuorum(true)
+ .run();
+ assertEquals("Formatting dynamic metadata voter directory " +
testEnv.directory(0) +
+ " with metadata.version " +
MetadataVersion.latestProduction() + ".",
+ formatter1.output().trim());
+ assertMetadataDirectoryId(testEnv,
Uuid.fromString(originalDirectoryId));
+
+ FormatterContext formatter2 = testEnv.newFormatter();
+ formatter2.formatter
+ .setIgnoreFormatted(true)
+ .setInitialControllers(DynamicVoters.parse("1@localhost:8020:"
+ newDirectoryId))
+ .run();
+ assertEquals("All of the log directories are already formatted.",
+ formatter2.output().trim());
+ assertMetadataDirectoryId(testEnv,
Uuid.fromString(originalDirectoryId));
+ }
+ }
+
+ private void assertMetadataDirectoryId(TestEnv testEnv, Uuid
expectedDirectoryId) throws Exception {
+ MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
+ addLogDirs(testEnv.directories).
+ load();
+ MetaProperties logDirProps0 =
ensemble.logDirProps().get(testEnv.directory(0));
+ assertEquals(expectedDirectoryId, logDirProps0.directoryId().get());
+ }
+
@Test
public void testOneDirectoryFormattedAndOthersNotFormatted() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
@@ -383,14 +418,15 @@ public class FormatterTest {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
if (specifyKRaftVersion) {
- formatter1.formatter.setFeatureLevel("kraft.version", (short)
1);
+
formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1);
}
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
formatter1.formatter.run();
- assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
- assertEquals(Arrays.asList(
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ assertEquals(List.of(
String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1),
MetadataVersion.FEATURE_NAME,
@@ -416,45 +452,66 @@ public class FormatterTest {
public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 0);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
assertTrue(formatter1.formatter.hasDynamicQuorum());
- assertEquals("Cannot set kraft.version to 0 if KIP-853
configuration is present. " +
- "Try removing the --feature flag for kraft.version.",
- assertThrows(FormatterException.class,
- () -> formatter1.formatter.run()).getMessage());
+ assertEquals(
+ "Cannot set kraft.version to 0 if controller.quorum.voters is
empty " +
+ "and one of the flags --standalone, --initial-controllers, or
--no-initial-controllers is used. " +
+ "For dynamic controllers support, try removing the --feature
flag for kraft.version.",
+ assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
+ );
}
}
@Test
- public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion()
throws Exception {
+ public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
assertFalse(formatter1.formatter.hasDynamicQuorum());
- assertEquals("Cannot set kraft.version to 1 unless KIP-853
configuration is present. " +
- "Try removing the --feature flag for kraft.version.",
- assertThrows(FormatterException.class,
- () -> formatter1.formatter.run()).getMessage());
+ assertEquals(
+ "Cannot set kraft.version to 1 unless controller.quorum.voters
is empty and " +
+ "one of the flags --standalone, --initial-controllers, or
--no-initial-controllers is used. " +
+ "For dynamic controllers support, try using one of
--standalone, --initial-controllers, " +
+ "or --no-initial-controllers and removing
controller.quorum.voters.",
+ assertThrows(FormatterException.class,
formatter1.formatter::run).getMessage()
+ );
}
}
@Test
- public void testFormatWithInitialVotersFailsWithOlderMetadataVersion()
throws Exception {
+ public void testFormatWithInitialVotersWithOlderMetadataVersion() throws
Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
+ formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- assertEquals("kraft.version could not be set to 1 because it
depends on " +
- "metadata.version level 21",
- assertThrows(IllegalArgumentException.class,
- () -> formatter1.formatter.run()).getMessage());
+ formatter1.formatter.setHasDynamicQuorum(true);
+ formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean
hasDynamicQuorum) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+ formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum);
+ formatter1.formatter.run();
+ if (hasDynamicQuorum) {
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ } else {
+ assertEquals((short) 0,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ }
}
}
@@ -475,6 +532,7 @@ public class FormatterTest {
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
+ formatter1.formatter.setHasDynamicQuorum(true);
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
assertDoesNotThrow(() -> formatter1.formatter.run());
} else {
@@ -486,21 +544,15 @@ public class FormatterTest {
}
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testFormatWithNoInitialControllers(boolean
specifyKRaftVersion) throws Exception {
+ @Test
+ public void testFormatWithNoInitialControllers() throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
- if (specifyKRaftVersion) {
- formatter1.formatter.setFeatureLevel("kraft.version", (short)
1);
- }
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(true);
- assertTrue(formatter1.formatter.hasDynamicQuorum());
-
+ assertFalse(formatter1.formatter.hasDynamicQuorum());
formatter1.formatter.run();
- assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
- assertEquals(Arrays.asList(
+ assertEquals((short) 0,
formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME));
+ assertEquals(List.of(
String.format("Formatting data directory %s with %s %s.",
testEnv.directory(1),
MetadataVersion.FEATURE_NAME,
@@ -519,34 +571,4 @@ public class FormatterTest {
assertNotNull(logDirProps1);
}
}
-
- @Test
- public void
testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws
Exception {
- try (TestEnv testEnv = new TestEnv(2)) {
- FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
- formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(false);
- assertFalse(formatter1.formatter.hasDynamicQuorum());
- assertEquals("Cannot set kraft.version to 1 unless KIP-853
configuration is present. " +
- "Try removing the --feature flag for kraft.version.",
- assertThrows(FormatterException.class,
- formatter1.formatter::run).getMessage());
- }
- }
-
- @Test
- public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion()
throws Exception {
- try (TestEnv testEnv = new TestEnv(2)) {
- FormatterContext formatter1 = testEnv.newFormatter();
- formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
- formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- formatter1.formatter.setNoInitialControllersFlag(true);
- assertTrue(formatter1.formatter.hasDynamicQuorum());
- assertEquals("Cannot set kraft.version to 0 if KIP-853
configuration is present. " +
- "Try removing the --feature flag for kraft.version.",
- assertThrows(FormatterException.class,
- formatter1.formatter::run).getMessage());
- }
- }
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index 211f6dcac44..988d58c3e0e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.server.common;
-import java.util.Collections;
import java.util.Map;
public enum KRaftVersion implements FeatureVersion {
@@ -73,12 +72,7 @@ public enum KRaftVersion implements FeatureVersion {
@Override
public Map<String, Short> dependencies() {
- if (this.featureLevel == 0) {
- return Collections.emptyMap();
- } else {
- return Collections.singletonMap(
- MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_9_IV0.featureLevel());
- }
+ return Map.of();
}
public short quorumStateVersion() {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 20963665440..1a16a7a0a34 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -27,10 +27,12 @@ import kafka.server.SharedServer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -42,7 +44,6 @@ import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.fault.FaultHandler;
@@ -114,6 +115,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String controllerListenerName;
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
+ private boolean standalone;
+ private Optional<Map<Integer, Uuid>> initialVoterSet =
Optional.empty();
private boolean deleteOnClose;
public Builder(TestKitNodes nodes) {
@@ -130,6 +133,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
return this;
}
+ public Builder setStandalone(boolean standalone) {
+ this.standalone = standalone;
+ return this;
+ }
+
+ public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
+ this.initialVoterSet = Optional.of(initialVoterSet);
+ return this;
+ }
+
private KafkaConfig createNodeConfig(TestKitNode node) throws
IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode =
nodes.controllerNodes().get(node.id());
@@ -168,18 +181,31 @@ public class KafkaClusterTestKit implements AutoCloseable
{
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
brokerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
controllerListenerName);
- StringBuilder quorumVoterStringBuilder = new StringBuilder();
- String prefix = "";
- for (int nodeId : nodes.controllerNodes().keySet()) {
- quorumVoterStringBuilder.append(prefix).
- append(nodeId).
- append("@").
- append("localhost").
- append(":").
-
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
- prefix = ",";
+ if (!standalone && initialVoterSet.isEmpty()) {
+ StringBuilder quorumVoterStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ quorumVoterStringBuilder.append(prefix).
+ append(nodeId).
+ append("@").
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
+ } else {
+ StringBuilder bootstrapServersStringBuilder = new
StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ bootstrapServersStringBuilder.append(prefix).
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
bootstrapServersStringBuilder.toString());
}
- props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
// reduce log cleaner offset map memory usage
props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
@@ -251,7 +277,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- Collections.emptyList(),
+
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
@@ -279,7 +305,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
Time.SYSTEM,
new Metrics(),
CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())),
- Collections.emptyList(),
+
QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()),
faultHandlerFactory,
socketFactoryManager.getOrCreateSocketFactory(node.id())
);
@@ -316,6 +342,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
faultHandlerFactory,
socketFactoryManager,
Optional.ofNullable(jaasFile),
+ standalone,
+ initialVoterSet,
deleteOnClose);
}
@@ -361,6 +389,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
+ private final boolean standalone;
+ private final Optional<Map<Integer, Uuid>> initialVoterSet;
private final boolean deleteOnClose;
private KafkaClusterTestKit(
@@ -371,6 +401,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile,
+ boolean standalone,
+ Optional<Map<Integer, Uuid>> initialVoterSet,
boolean deleteOnClose
) {
/*
@@ -388,6 +420,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
+ this.standalone = standalone;
+ this.initialVoterSet = initialVoterSet;
this.deleteOnClose = deleteOnClose;
}
@@ -422,8 +456,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
boolean writeMetadataDirectory
) {
try {
+ final var nodeId = ensemble.nodeId().getAsInt();
Formatter formatter = new Formatter();
- formatter.setNodeId(ensemble.nodeId().getAsInt());
+ formatter.setNodeId(nodeId);
formatter.setClusterId(ensemble.clusterId().get());
if (writeMetadataDirectory) {
formatter.setDirectories(ensemble.logDirProps().keySet());
@@ -436,8 +471,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
return;
}
formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion());
- formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME,
-
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName(controllerListenerName);
@@ -446,18 +479,43 @@ public class KafkaClusterTestKit implements AutoCloseable
{
} else {
formatter.setMetadataLogDirectory(Optional.empty());
}
- if
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
- StringBuilder dynamicVotersBuilder = new StringBuilder();
- String prefix = "";
- for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
- int port = socketFactoryManager.
- getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
+ StringBuilder dynamicVotersBuilder = new StringBuilder();
+ String prefix = "";
+ if (standalone) {
+ if (nodeId == TestKitDefaults.BROKER_ID_OFFSET +
TestKitDefaults.CONTROLLER_ID_OFFSET) {
+ final var controllerNode =
nodes.controllerNodes().get(nodeId);
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ controllerNode.id(),
+ socketFactoryManager.
+
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+ controllerNode.metadataDirectoryId()
+ )
+ );
+
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ }
+ // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET,
the node is formatting with
+ // the --no-initial-controllers flag
+ formatter.setHasDynamicQuorum(true);
+ } else if (initialVoterSet.isPresent()) {
+ for (final var controllerNode :
initialVoterSet.get().entrySet()) {
+ final var voterId = controllerNode.getKey();
+ final var voterDirectoryId = controllerNode.getValue();
dynamicVotersBuilder.append(prefix);
prefix = ",";
-
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
- controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ voterId,
+ socketFactoryManager.
+ getOrCreatePortForListener(voterId,
controllerListenerName),
+ voterDirectoryId
+ )
+ );
}
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+ formatter.setHasDynamicQuorum(true);
}
formatter.run();
} catch (Exception e) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 759e86c200b..cd8879a84db 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -94,11 +94,6 @@ public class TestKitNodes {
return this;
}
- public Builder setFeature(String featureName, short level) {
- this.bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
- return this;
- }
-
public Builder setCombined(boolean combined) {
this.combined = combined;
return this;