This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2b2b3cd355c KAFKA-18062: use feature version to enable ELR (#17867)
2b2b3cd355c is described below
commit 2b2b3cd355c2bb226a869988a6b1a59a7f2db055
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Nov 26 14:40:23 2024 -0800
KAFKA-18062: use feature version to enable ELR (#17867)
Replace the ELR static config with feature version.
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/server/ControllerServer.scala | 3 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 -
.../kafka/server/QuorumTestHarness.scala | 8 ++-
.../src/test/scala/kafka/utils/TestInfoUtils.scala | 8 +++
.../server/AbstractApiVersionsRequestTest.scala | 9 ++-
.../scala/unit/kafka/tools/StorageToolTest.scala | 2 +-
.../kafka/controller/FeatureControlManager.java | 9 +++
.../apache/kafka/controller/QuorumController.java | 20 +-----
.../controller/ReplicationControlManager.java | 18 +----
.../QuorumControllerIntegrationTestUtils.java | 30 +++++++-
.../kafka/controller/QuorumControllerTest.java | 21 ++++--
.../kafka/controller/QuorumControllerTestEnv.java | 12 +++-
.../controller/ReplicationControlManagerTest.java | 14 +++-
.../kafka/metadata/storage/FormatterTest.java | 3 +-
.../common/EligibleLeaderReplicasVersion.java | 80 ++++++++++++++++++++++
.../org/apache/kafka/server/common/Features.java | 3 +-
.../apache/kafka/server/config/KRaftConfigs.java | 4 --
.../apache/kafka/server/BrokerFeaturesTest.java | 2 +
.../org/apache/kafka/tools/FeatureCommandTest.java | 23 ++++---
19 files changed, 201 insertions(+), 70 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 90deff7ed86..8cc14516126 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -242,8 +242,7 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
- setInterBrokerListenerName(config.interBrokerListenerName.value()).
- setEligibleLeaderReplicasEnabled(config.elrEnabled)
+ setInterBrokerListenerName(config.interBrokerListenerName.value())
}
controller = controllerBuilder.build()
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fec938758c4..18327a59b55 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -340,8 +340,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val migrationEnabled: Boolean =
getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
val migrationMetadataMinBatchSize: Int =
getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)
- val elrEnabled: Boolean = getBoolean(KRaftConfigs.ELR_ENABLED_CONFIG)
-
private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
diff --git
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index cc0c2bb3662..45712240ecd 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -43,7 +43,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
-import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs,
ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
@@ -376,6 +376,12 @@ abstract class QuorumTestHarness extends Logging {
} else TransactionVersion.TV_1.featureLevel()
formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME,
transactionVersion)
+ val elrVersion =
+ if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
+ } else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
+ formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME,
elrVersion)
+
addFormatterSettings(formatter)
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index 3b7732374d5..cd22727839e 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -67,4 +67,12 @@ object TestInfoUtils {
def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
!testInfo.getDisplayName.contains("isTV2Enabled=false")
}
+
+ /**
+ * Returns whether eligible leader replicas version 1 is enabled.
+ * When no parameter is provided, the default returned is false.
+ */
+ def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
+ testInfo.getDisplayName.contains("isELRV1Enabled=true")
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 4f442d3194e..296e285bdeb 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{GroupVersion, MetadataVersion,
TransactionVersion}
+import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
@@ -65,11 +65,11 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
- assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
+ assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(),
apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
- assertEquals(4, apiVersionsResponse.data().supportedFeatures().size())
+ assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1,
apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -83,6 +83,9 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
assertEquals(GroupVersion.GV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())
+
+ assertEquals(0,
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
+ assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(),
apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if
(cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 5a213e6c186..b38a2178bae 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -336,7 +336,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported
features are: " +
- "group.version, kraft.version, transaction.version",
+ "eligible.leader.replicas.version, group.version, kraft.version,
transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index f114d594ae5..436c9d868cf 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -396,6 +396,15 @@ public class FeatureControlManager {
return new FinalizedControllerFeatures(features, epoch);
}
+ FinalizedControllerFeatures latestFinalizedFeatures() {
+ Map<String, Short> features = new HashMap<>();
+ features.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get().featureLevel());
+ for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
+ features.put(entry.getKey(), entry.getValue());
+ }
+ return new FinalizedControllerFeatures(features, -1);
+ }
+
public void replay(FeatureLevelRecord record) {
VersionRange range =
quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 71e43b55b25..bd98f7c8096 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -214,7 +214,6 @@ public final class QuorumController implements Controller {
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
- private boolean eligibleLeaderReplicasEnabled = false;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
@@ -337,11 +336,6 @@ public final class QuorumController implements Controller {
return this;
}
- public Builder setEligibleLeaderReplicasEnabled(boolean
eligibleLeaderReplicasEnabled) {
- this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
- return this;
- }
-
public Builder setDelegationTokenCache(DelegationTokenCache
tokenCache) {
this.tokenCache = tokenCache;
return this;
@@ -432,7 +426,6 @@ public final class QuorumController implements Controller {
delegationTokenMaxLifeMs,
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
- eligibleLeaderReplicasEnabled,
uncleanLeaderElectionCheckIntervalMs,
interBrokerListenerName
);
@@ -1436,11 +1429,6 @@ public final class QuorumController implements
Controller {
*/
private final BootstrapMetadata bootstrapMetadata;
- /**
- * True if the KIP-966 eligible leader replicas feature is enabled.
- */
- private final boolean eligibleLeaderReplicasEnabled;
-
/**
* The maximum number of records per batch to allow.
*/
@@ -1480,7 +1468,6 @@ public final class QuorumController implements Controller
{
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
- boolean eligibleLeaderReplicasEnabled,
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName
) {
@@ -1549,7 +1536,6 @@ public final class QuorumController implements Controller
{
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
- setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
@@ -1580,7 +1566,6 @@ public final class QuorumController implements Controller
{
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
- this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
@@ -1599,10 +1584,7 @@ public final class QuorumController implements
Controller {
setMetrics(controllerMetrics).
setTime(time).
build();
-
- log.info("Creating new QuorumController with clusterId {}.{}",
- clusterId,
- eligibleLeaderReplicasEnabled ? " Eligible leader replicas
enabled." : "");
+ log.info("Creating new QuorumController with clusterId {}", clusterId);
this.raftClient.register(metaLogListener);
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 16cc762ebc5..a995068eccd 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -95,6 +95,7 @@ import org.apache.kafka.metadata.placement.PlacementSpec;
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -165,7 +166,6 @@ public class ReplicationControlManager {
private ClusterControlManager clusterControl = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
private FeatureControlManager featureControl = null;
- private boolean eligibleLeaderReplicasEnabled = false;
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
@@ -187,11 +187,6 @@ public class ReplicationControlManager {
return this;
}
- Builder setEligibleLeaderReplicasEnabled(boolean
eligibleLeaderReplicasEnabled) {
- this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
- return this;
- }
-
Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
@@ -233,7 +228,6 @@ public class ReplicationControlManager {
defaultReplicationFactor,
defaultNumPartitions,
maxElectionsPerImbalance,
- eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
@@ -305,11 +299,6 @@ public class ReplicationControlManager {
*/
private final int defaultNumPartitions;
- /**
- * True if eligible leader replicas is enabled.
- */
- private final boolean eligibleLeaderReplicasEnabled;
-
/**
* Maximum number of leader elections to perform during one partition
leader balancing operation.
*/
@@ -399,7 +388,6 @@ public class ReplicationControlManager {
short defaultReplicationFactor,
int defaultNumPartitions,
int maxElectionsPerImbalance,
- boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional<CreateTopicPolicy> createTopicPolicy,
@@ -410,7 +398,6 @@ public class ReplicationControlManager {
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
- this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
@@ -1029,7 +1016,8 @@ public class ReplicationControlManager {
}
boolean isElrEnabled() {
- return eligibleLeaderReplicasEnabled &&
featureControl.metadataVersion().isElrSupported();
+ return featureControl.metadataVersion().isElrSupported() &&
featureControl.latestFinalizedFeatures().
+ versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME,
(short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}
ControllerResult<AlterPartitionResponseData> alterPartition(
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index a8331fe9f23..2cca9ef7cc6 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicRe
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.slf4j.Logger;
@@ -75,6 +76,32 @@ public class QuorumControllerIntegrationTestUtils {
return features;
}
+ /**
+ * Create a broker features collection for use in a registration request.
MV and given features are included.
+ *
+ * @param minVersion The minimum supported MV.
+ * @param maxVersion The maximum supported MV.
+ * @param featureMaxVersions The features and their max supported
versions.
+ */
+ static BrokerRegistrationRequestData.FeatureCollection
brokerFeaturesPlusFeatureVersions(
+ MetadataVersion minVersion,
+ MetadataVersion maxVersion,
+ Map<String, Short> featureMaxVersions
+ ) {
+ BrokerRegistrationRequestData.FeatureCollection features = new
BrokerRegistrationRequestData.FeatureCollection();
+ features.add(new BrokerRegistrationRequestData.Feature()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setMinSupportedVersion(minVersion.featureLevel())
+ .setMaxSupportedVersion(maxVersion.featureLevel()));
+ featureMaxVersions.entrySet().forEach(entry -> {
+ features.add(new BrokerRegistrationRequestData.Feature()
+ .setName(entry.getKey())
+ .setMaxSupportedVersion(entry.getValue())
+ .setMinSupportedVersion((short) 0));
+ });
+ return features;
+ }
+
/**
* Register the given number of brokers.
*
@@ -94,7 +121,8 @@ public class QuorumControllerIntegrationTestUtils {
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
- .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting()))
+
.setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting(),
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel())))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB"
+ brokerId))
.setLogDirs(Collections.singletonList(
Uuid.fromString("TESTBROKER" + Integer.toString(100000
+ brokerId).substring(1) + "DIRAAAA")
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 867957513b4..6e64483a311 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -106,6 +106,7 @@ import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.Features;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -157,6 +158,7 @@ import static
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
import static
org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeaturesPlusFeatureVersions;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
import static
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeatToUnfenceBrokers;
@@ -188,7 +190,8 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting())).
+
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting(),
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
setClusterId(logEnv.clusterId())).get();
@@ -229,7 +232,8 @@ public class QuorumControllerTest {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting())).
+
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting(),
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
setClusterId(logEnv.clusterId())).get();
@@ -367,14 +371,16 @@ public class QuorumControllerTest {
listeners.add(new
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
QuorumController active = controlEnv.activeController();
Map<Integer, Long> brokerEpochs = new HashMap<>();
-
+ BrokerRegistrationRequestData.FeatureCollection features =
+ brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_4_0_IV1,
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()));
for (Integer brokerId : allBrokers) {
CompletableFuture<BrokerRegistrationReply> reply =
active.registerBroker(
anonymousContextFor(ApiKeys.BROKER_REGISTRATION),
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
-
setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_4_0_IV1)).
+ setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners));
@@ -442,7 +448,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(brokerToUncleanShutdown).
setClusterId(active.clusterId()).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_4_0_IV1)).
+ setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
@@ -455,7 +461,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(lastKnownElr[0]).
setClusterId(active.clusterId()).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.IBP_4_0_IV1)).
+ setFeatures(features).
setIncarnationId(Uuid.randomUuid()).
setLogDirs(Collections.singletonList(Uuid.randomUuid())).
setListeners(listeners)).get();
@@ -737,7 +743,8 @@ public class QuorumControllerTest {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting())).
+
setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1,
MetadataVersion.latestTesting(),
+ Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()))).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 8bece2bb86c..70ae12c96cd 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,10 +17,12 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.metadata.FakeKafkaConfigSchema;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
@@ -115,11 +117,17 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
fatalFaultHandlers.put(nodeId, fatalFaultHandler);
MockFaultHandler nonFatalFaultHandler = new
MockFaultHandler("nonFatalFaultHandler");
builder.setNonFatalFaultHandler(nonFatalFaultHandler);
-
builder.setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled);
builder.setConfigSchema(FakeKafkaConfigSchema.INSTANCE);
nonFatalFaultHandlers.put(nodeId, fatalFaultHandler);
controllerBuilderInitializer.accept(builder);
- this.controllers.add(builder.build());
+ QuorumController controller = builder.build();
+ if (eligibleLeaderReplicasEnabled) {
+ controller.featureControl().replay(new FeatureLevelRecord()
+ .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
+
.setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel())
+ );
+ }
+ this.controllers.add(controller);
}
} catch (Exception e) {
close();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index c53f068e9d6..55ed04c15a7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -63,6 +63,7 @@ import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.On
import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -92,6 +93,7 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -218,6 +220,7 @@ public class ReplicationControlManagerTest {
final ClusterControlManager clusterControl;
final ConfigurationControlManager configurationControl;
final ReplicationControlManager replicationControl;
+ final OffsetControlManager offsetControlManager;
void replay(List<ApiMessageAndVersion> records) {
RecordTestUtils.replayAll(clusterControl, records);
@@ -245,6 +248,12 @@ public class ReplicationControlManagerTest {
Collections.singletonList(0))).
setMetadataVersion(metadataVersion).
build();
+ featureControl.replay(new FeatureLevelRecord()
+ .setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
+ .setFeatureLevel(isElrEnabled ?
+ EligibleLeaderReplicasVersion.ELRV_1.featureLevel() :
+ EligibleLeaderReplicasVersion.ELRV_0.featureLevel())
+ );
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setTime(time).
@@ -254,7 +263,9 @@ public class ReplicationControlManagerTest {
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
build();
-
+ this.offsetControlManager = new OffsetControlManager.Builder().
+ setSnapshotRegistry(snapshotRegistry).
+ build();
this.replicationControl = new ReplicationControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
@@ -263,7 +274,6 @@ public class ReplicationControlManagerTest {
setClusterControl(clusterControl).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
- setEligibleLeaderReplicasEnabled(isElrEnabled).
build();
clusterControl.activate();
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index c0d9cd4ee95..39f5925a0d2 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -360,7 +360,8 @@ public class FormatterTest {
formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values()));
formatter1.formatter.setFeatureLevel("nonexistent.feature",
(short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported
features " +
- "are: group.version, kraft.version, test.feature.version,
transaction.version",
+ "are: eligible.leader.replicas.version, group.version,
kraft.version, " +
+ "test.feature.version, transaction.version",
assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
new file mode 100644
index 00000000000..fd10690a663
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum EligibleLeaderReplicasVersion implements FeatureVersion {
+
+ // Version 0 is the version disable ELR.
+ ELRV_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+
+ // Version 1 enables the ELR (KIP-966).
+ ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
+
+ public static final String FEATURE_NAME =
"eligible.leader.replicas.version";
+
+ private final short featureLevel;
+ private final MetadataVersion bootstrapMetadataVersion;
+ private final Map<String, Short> dependencies;
+
+ EligibleLeaderReplicasVersion(
+ int featureLevel,
+ MetadataVersion bootstrapMetadataVersion,
+ Map<String, Short> dependencies
+ ) {
+ this.featureLevel = (short) featureLevel;
+ this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+ this.dependencies = dependencies;
+ }
+
+ @Override
+ public short featureLevel() {
+ return featureLevel;
+ }
+
+ @Override
+ public String featureName() {
+ return FEATURE_NAME;
+ }
+
+ @Override
+ public MetadataVersion bootstrapMetadataVersion() {
+ return bootstrapMetadataVersion;
+ }
+
+ @Override
+ public Map<String, Short> dependencies() {
+ return dependencies;
+ }
+
+ public boolean isEligibleLeaderReplicasFeatureEnabeld() {
+ return featureLevel >= ELRV_1.featureLevel;
+ }
+
+ public static EligibleLeaderReplicasVersion fromFeatureLevel(short
version) {
+ switch (version) {
+ case 0:
+ return ELRV_0;
+ case 1:
+ return ELRV_1;
+ default:
+ throw new RuntimeException("Unknown eligible leader replicas
feature level: " + (int) version);
+ }
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
index 51f3d78e868..bd4fa0c8615 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
@@ -44,7 +44,8 @@ public enum Features {
TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
KRAFT_VERSION("kraft.version", KRaftVersion.values()),
TRANSACTION_VERSION("transaction.version", TransactionVersion.values()),
- GROUP_VERSION("group.version", GroupVersion.values());
+ GROUP_VERSION("group.version", GroupVersion.values()),
+ ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version",
EligibleLeaderReplicasVersion.values());
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;
diff --git
a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index a29c71ad4be..d2cf35f33a5 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -123,9 +123,6 @@ public class KRaftConfigs {
public static final int MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT = 200;
public static final String MIGRATION_METADATA_MIN_BATCH_SIZE_DOC = "Soft
minimum batch size to use when migrating metadata from ZooKeeper to KRaft";
- /** Enable eligible leader replicas configs */
- public static final String ELR_ENABLED_CONFIG =
"eligible.leader.replicas.enable";
- public static final String ELR_ENABLED_DOC = "Enable the Eligible leader
replicas";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG,
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH,
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG,
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH,
METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
@@ -145,7 +142,6 @@ public class KRaftConfigs {
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT,
METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW,
METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(SERVER_MAX_STARTUP_TIME_MS_CONFIG, LONG,
SERVER_MAX_STARTUP_TIME_MS_DEFAULT, atLeast(0), MEDIUM,
SERVER_MAX_STARTUP_TIME_MS_DOC)
.define(MIGRATION_ENABLED_CONFIG, BOOLEAN, false, HIGH,
MIGRATION_ENABLED_DOC)
- .define(ELR_ENABLED_CONFIG, BOOLEAN, false, HIGH, ELR_ENABLED_DOC)
.defineInternal(MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG, INT,
MIGRATION_METADATA_MIN_BATCH_SIZE_DEFAULT, atLeast(1),
MEDIUM, MIGRATION_METADATA_MIN_BATCH_SIZE_DOC);
}
diff --git
a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
index 6ce2b3a7e65..8963b40fceb 100644
--- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
+++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.kafka.server.common.Features.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Features.GROUP_VERSION;
import static org.apache.kafka.server.common.Features.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -111,6 +112,7 @@ public class BrokerFeaturesTest {
expectedFeatures.put(MetadataVersion.FEATURE_NAME,
MetadataVersion.latestTesting().featureLevel());
expectedFeatures.put(TRANSACTION_VERSION.featureName(),
TRANSACTION_VERSION.latestTesting());
expectedFeatures.put(GROUP_VERSION.featureName(),
GROUP_VERSION.latestTesting());
+ expectedFeatures.put(ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting());
expectedFeatures.put("kraft.version", (short) 0);
expectedFeatures.put("test_feature_1", (short) 4);
expectedFeatures.put("test_feature_2", (short) 3);
diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
index 452f74d5c19..1aafc7db994 100644
--- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
@@ -60,14 +60,16 @@ public class FeatureCommandTest {
List<String> features =
Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
// Change expected message to reflect latest MetadataVersion
(SupportedMaxVersion increases when adding a new version)
- assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(0)));
- assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.0-IV1\t" +
- "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel:
3.3-IV1\t", outputWithoutEpoch(features.get(2)));
+ "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel:
3.3-IV1\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
}
// Use the first MetadataVersion that supports KIP-919
@@ -80,14 +82,16 @@ public class FeatureCommandTest {
List<String> features =
Arrays.stream(commandOutput.split("\n")).sorted().collect(Collectors.toList());
// Change expected message to reflect latest MetadataVersion
(SupportedMaxVersion increases when adding a new version)
- assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
+ assertEquals("Feature:
eligible.leader.replicas.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(0)));
- assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ assertEquals("Feature: group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(1)));
+ assertEquals("Feature: kraft.version\tSupportedMinVersion: 0\t" +
+ "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(2)));
assertEquals("Feature: metadata.version\tSupportedMinVersion:
3.0-IV1\t" +
- "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(2)));
+ "SupportedMaxVersion: 4.0-IV3\tFinalizedVersionLevel:
3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
- "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(3)));
+ "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t",
outputWithoutEpoch(features.get(4)));
}
@ClusterTest(types = {Type.KRAFT}, metadataVersion =
MetadataVersion.IBP_3_3_IV1)
@@ -172,7 +176,8 @@ public class FeatureCommandTest {
"downgrade", "--release-version", "3.7-IV3"))
);
- assertEquals("group.version was downgraded to 0.\n" +
+ assertEquals("eligible.leader.replicas.version was downgraded to 0.\n"
+
+ "group.version was downgraded to 0.\n" +
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
"transaction.version was downgraded to 0.", commandOutput);