This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d3e366bc24 KAFKA-16772: Introduce kraft.version to support KIP-853
(#16230)
4d3e366bc24 is described below
commit 4d3e366bc2487ca0c2321e6e6d7786fb6ed0efa5
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Tue Jul 16 09:31:10 2024 -0700
KAFKA-16772: Introduce kraft.version to support KIP-853 (#16230)
Introduce the KRaftVersion enum to describe the current value of
kraft.version. Change a bunch of places in the code that were using raw shorts
over to using this new enum.
In BrokerServer.scala, fix a bug that could cause null pointer exceptions
during shutdown if we tried to shut down before fully coming up.
Do not send finalized features that are finalized as level 0, since it is a
no-op.
Reviewers: dengziming <[email protected]>, José Armando García
Sancio <[email protected]>
---
.../kafka/clients/admin/SupportedVersionRange.java | 2 +-
.../kafka/common/requests/ApiVersionsResponse.java | 10 +-
.../main/scala/kafka/server/BrokerFeatures.scala | 3 -
.../src/main/scala/kafka/server/BrokerServer.scala | 12 +-
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../main/scala/kafka/server/MetadataCache.scala | 10 +-
.../kafka/server/metadata/KRaftMetadataCache.scala | 13 +-
.../DescribeTopicPartitionsRequestHandlerTest.java | 5 +-
.../kafka/server/KRaftClusterTest.scala | 9 +-
.../unit/kafka/server/ControllerApisTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 80 +++---
.../unit/kafka/server/MetadataCacheTest.scala | 16 +-
.../server/ReplicaManagerConcurrencyTest.scala | 4 +-
.../metadata/BrokerMetadataPublisherTest.scala | 4 +-
.../unit/kafka/tools/DumpLogSegmentsTest.scala | 4 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 3 +-
.../kafka/image/publisher/SnapshotEmitterTest.java | 6 +
.../org/apache/kafka/metalog/LocalLogManager.java | 6 +
.../apache/kafka/raft/FileQuorumStateStore.java | 17 +-
.../org/apache/kafka/raft/KafkaRaftClient.java | 6 +
.../java/org/apache/kafka/raft/LeaderState.java | 9 +-
.../org/apache/kafka/raft/QuorumStateStore.java | 4 +-
.../java/org/apache/kafka/raft/RaftClient.java | 8 +
.../internals/KRaftControlRecordStateMachine.java | 20 +-
.../kafka/snapshot/RecordsSnapshotWriter.java | 11 +-
.../kafka/raft/FileQuorumStateStoreTest.java | 24 +-
.../org/apache/kafka/raft/LeaderStateTest.java | 3 +-
.../apache/kafka/raft/MockQuorumStateStore.java | 17 +-
.../org/apache/kafka/raft/QuorumStateTest.java | 269 +++++++++++----------
.../apache/kafka/raft/RaftClientTestContext.java | 16 +-
.../KRaftControlRecordStateMachineTest.java | 21 +-
.../kafka/raft/internals/KafkaRaftMetricsTest.java | 43 ++--
.../kafka/raft/internals/RecordsIteratorTest.java | 5 +-
.../kafka/snapshot/RecordsSnapshotWriterTest.java | 9 +-
.../org/apache/kafka/server/common/Features.java | 3 +-
.../apache/kafka/server/common/KRaftVersion.java | 86 +++++++
.../kafka/server/common/TestFeatureVersion.java | 2 +-
.../apache/kafka/server/common/FeaturesTest.java | 14 +-
.../kafka/server/common/KRaftVersionTest.java | 62 +++++
39 files changed, 519 insertions(+), 323 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
index 2eef560d264..c8f5715f593 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/SupportedVersionRange.java
@@ -35,7 +35,7 @@ public class SupportedVersionRange {
*
* @throws IllegalArgumentException Raised when the condition described
above is not met.
*/
- SupportedVersionRange(final short minVersion, final short maxVersion) {
+ public SupportedVersionRange(final short minVersion, final short
maxVersion) {
if (minVersion < 0 || maxVersion < 0 || maxVersion < minVersion) {
throw new IllegalArgumentException(
String.format(
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index a7e4a27c443..aaa87a817f0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -313,10 +313,12 @@ public class ApiVersionsResponse extends AbstractResponse
{
for (Map.Entry<String, Short> feature : finalizedFeatures.entrySet()) {
final FinalizedFeatureKey key = new FinalizedFeatureKey();
final short versionLevel = feature.getValue();
- key.setName(feature.getKey());
- key.setMinVersionLevel(versionLevel);
- key.setMaxVersionLevel(versionLevel);
- converted.add(key);
+ if (versionLevel != 0) {
+ key.setName(feature.getKey());
+ key.setMinVersionLevel(versionLevel);
+ key.setMaxVersionLevel(versionLevel);
+ converted.add(key);
+ }
}
return converted;
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala
b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index f900783e158..793d2e2a0fe 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -98,9 +98,6 @@ object BrokerFeatures extends Logging {
feature.latestProduction
}))
}
- if (unstableFeatureVersionsEnabled) {
- features.put("kraft.version", new SupportedVersionRange(0, 1))
- }
Features.supportedFeatures(features)
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 074c9b12f4c..b2a7b482596 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -192,7 +192,7 @@ class BrokerServer(
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
- metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+ metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () =>
raftManager.client.kraftVersion())
// Create log manager, but don't start it because we need to delay any
potential unclean shutdown log recovery
// until we catch up on the metadata log and have up-to-date topic and
broker configs.
@@ -379,11 +379,6 @@ class BrokerServer(
featuresRemapped,
logManager.readBrokerEpochFromCleanShutdownFiles()
)
- // If the BrokerLifecycleManager's initial catch-up future fails, it
means we timed out
- // or are shutting down before we could catch up. Therefore, also fail
the firstPublishFuture.
- lifecycleManager.initialCatchUpFuture.whenComplete((_, e) => {
- if (e != null)
brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
- })
// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
@@ -483,6 +478,11 @@ class BrokerServer(
sharedServer.initialBrokerMetadataLoadFaultHandler,
sharedServer.metadataPublishingFaultHandler
)
+ // If the BrokerLifecycleManager's initial catch-up future fails, it
means we timed out
+ // or are shutting down before we could catch up. Therefore, also fail
the firstPublishFuture.
+ lifecycleManager.initialCatchUpFuture.whenComplete((_, e) => {
+ if (e != null)
brokerMetadataPublisher.firstPublishFuture.completeExceptionally(e)
+ })
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new
BrokerRegistrationTracker(config.brokerId,
() => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index d362928583d..1008fe2af72 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -162,7 +162,7 @@ class ControllerServer(
authorizer = config.createNewAuthorizer()
authorizer.foreach(_.configure(config.originals))
- metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
+ metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId, () =>
raftManager.client.kraftVersion())
metadataCachePublisher = new KRaftMetadataCachePublisher(metadataCache)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 2636264a57d..68afc0b3d35 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -22,9 +22,10 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData,
UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
+import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import java.util
+import java.util.function.Supplier
import scala.collection._
/**
@@ -121,7 +122,10 @@ object MetadataCache {
new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
zkMigrationEnabled)
}
- def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
- new KRaftMetadataCache(brokerId)
+ def kRaftMetadataCache(
+ brokerId: Int,
+ kraftVersionSupplier: Supplier[KRaftVersion]
+ ): KRaftMetadataCache = {
+ new KRaftMetadataCache(brokerId, kraftVersionSupplier)
}
}
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index a058c435e57..56a90e4c94c 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -33,10 +33,11 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration,
Replicas}
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
+import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import java.util
import java.util.concurrent.ThreadLocalRandom
+import java.util.function.Supplier
import java.util.{Collections, Properties}
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, Set, mutable}
@@ -45,7 +46,10 @@ import scala.jdk.CollectionConverters._
import scala.util.control.Breaks._
-class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging
with ConfigRepository {
+class KRaftMetadataCache(
+ val brokerId: Int,
+ val kraftVersionSupplier: Supplier[KRaftVersion]
+) extends MetadataCache with Logging with ConfigRepository {
this.logIdent = s"[MetadataCache brokerId=$brokerId] "
// This is the cache state. Every MetadataImage instance is immutable, and
updates
@@ -541,8 +545,11 @@ class KRaftMetadataCache(val brokerId: Int) extends
MetadataCache with Logging w
override def features(): FinalizedFeatures = {
val image = _currentImage
+ val finalizedFeatures = new java.util.HashMap[String,
java.lang.Short](image.features().finalizedVersions())
+ finalizedFeatures.put(KRaftVersion.FEATURE_NAME,
kraftVersionSupplier.get().featureLevel())
+
new FinalizedFeatures(image.features().metadataVersion(),
- image.features().finalizedVersions(),
+ finalizedFeatures,
image.highestOffsetAndEpoch().offset,
true)
}
diff --git
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
index d8ee999a2d6..4fcb7a5654c 100644
---
a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
+++
b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
@@ -63,6 +63,7 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.KRaftConfigs;
@@ -197,7 +198,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
.setPartitionEpoch(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
);
- KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+ KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () ->
KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
DescribeTopicPartitionsRequestHandler handler =
new DescribeTopicPartitionsRequestHandler(metadataCache, new
AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
@@ -399,7 +400,7 @@ class DescribeTopicPartitionsRequestHandlerTest {
.setPartitionEpoch(2)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
);
- KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
+ KRaftMetadataCache metadataCache = new KRaftMetadataCache(0, () ->
KRaftVersion.KRAFT_VERSION_1);
updateKraftMetadataCache(metadataCache, records);
DescribeTopicPartitionsRequestHandler handler =
new DescribeTopicPartitionsRequestHandler(metadataCache, new
AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 8be2b994116..7e74ced9a02 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.authorizer._
-import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
MetadataVersion}
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.quota
@@ -1018,7 +1018,6 @@ class KRaftClusterTest {
}
}
-
@Test
def testUpdateMetadataVersion(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
@@ -1036,11 +1035,13 @@ class KRaftClusterTest {
Map(MetadataVersion.FEATURE_NAME ->
new FeatureUpdate(MetadataVersion.latestTesting().featureLevel(),
FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
)
+ assertEquals(new SupportedVersionRange(0, 1),
admin.describeFeatures().featureMetadata().get().
+ supportedFeatures().get(KRaftVersion.FEATURE_NAME))
} finally {
admin.close()
}
- TestUtils.waitUntilTrue(() =>
cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latestTesting()),
- "Timed out waiting for metadata.version update.")
+ TestUtils.waitUntilTrue(() =>
cluster.brokers().get(0).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latestTesting()),
+ "Timed out waiting for metadata.version update")
} finally {
cluster.close()
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 997a711125a..4fd51040a3b 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.controller.{Controller,
ControllerRequestContext, Result
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext,
AuthorizationResult, Authorizer}
-import org.apache.kafka.server.common.{ApiMessageAndVersion,
FinalizedFeatures, MetadataVersion, ProducerIdsBlock}
+import org.apache.kafka.server.common.{ApiMessageAndVersion,
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
import org.apache.kafka.server.util.FutureUtils
import org.apache.kafka.storage.internals.log.CleanerConfig
@@ -123,7 +123,7 @@ class ControllerApisTest {
)
private val replicaQuotaManager: ReplicationQuotaManager =
mock(classOf[ReplicationQuotaManager])
private val raftManager: RaftManager[ApiMessageAndVersion] =
mock(classOf[RaftManager[ApiMessageAndVersion]])
- private val metadataCache: KRaftMetadataCache =
MetadataCache.kRaftMetadataCache(0)
+ private val metadataCache: KRaftMetadataCache =
MetadataCache.kRaftMetadataCache(0, () => KRaftVersion.KRAFT_VERSION_0)
private val quotasNeverThrottleControllerMutations = QuotaManagers(
clientQuotaManager,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 73ed8a3844e..9ed5ba2ce38 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -80,7 +80,7 @@ import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult,
Authorizer}
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0,
IBP_2_2_IV1}
-import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
+import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import org.apache.kafka.server.config.{ConfigType, KRaftConfigs,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareGroupConfig}
import org.apache.kafka.server.metrics.ClientMetricsTestUtils
import org.apache.kafka.server.util.{FutureUtils, MockTime}
@@ -631,7 +631,7 @@ class KafkaApisTest extends Logging {
def testDescribeQuorumForwardedForKRaftClusters(): Unit = {
val requestData =
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
testForwardableApi(kafkaApis = kafkaApis,
ApiKeys.DESCRIBE_QUORUM,
@@ -643,7 +643,7 @@ class KafkaApisTest extends Logging {
apiKey: ApiKeys,
requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true)
testForwardableApi(kafkaApis = kafkaApis,
apiKey,
@@ -918,7 +918,7 @@ class KafkaApisTest extends Logging {
controllerThrottleTimeMs: Int,
requestThrottleTimeMs: Int
): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
val topicToCreate = new CreatableTopic()
.setName("topic")
@@ -6805,77 +6805,77 @@ class KafkaApisTest extends Logging {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_,
RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_,
RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls)
}
@@ -6883,7 +6883,7 @@ class KafkaApisTest extends Logging {
@Test
def testEmptyLegacyAlterConfigsRequestWithKRaft(): Unit = {
val request = buildRequest(new AlterConfigsRequest(new
AlterConfigsRequestData(), 1.toShort))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(raftSupport = true)
@@ -6903,7 +6903,7 @@ class KafkaApisTest extends Logging {
setConfigs(new LAlterableConfigCollection(asList(new
LAlterableConfig().
setName("foo").
setValue(null)).iterator()))).iterator())), 1.toShort))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(raftSupport = true)
@@ -6920,7 +6920,7 @@ class KafkaApisTest extends Logging {
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest)
}
@@ -6928,7 +6928,7 @@ class KafkaApisTest extends Logging {
@Test
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
val request = buildRequest(new IncrementalAlterConfigsRequest(new
IncrementalAlterConfigsRequestData(), 1.toShort))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(raftSupport = true)
@@ -6948,7 +6948,7 @@ class KafkaApisTest extends Logging {
setName(Log4jController.ROOT_LOGGER).
setValue("TRACE")).iterator()))).iterator())),
1.toShort))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(raftSupport = true)
@@ -6967,7 +6967,7 @@ class KafkaApisTest extends Logging {
// Test that in KRaft mode, a request that isn't forwarded gets the correct
error message.
// We skip the pre-forward checks in handleCreateTokenRequest
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk)
}
@@ -6976,7 +6976,7 @@ class KafkaApisTest extends Logging {
// Test that in KRaft mode, a request that isn't forwarded gets the correct
error message.
// We skip the pre-forward checks in handleRenewTokenRequest
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk)
}
@@ -6985,42 +6985,42 @@ class KafkaApisTest extends Logging {
// Test that in KRaft mode, a request that isn't forwarded gets the correct
error message.
// We skip the pre-forward checks in handleExpireTokenRequest
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures)
}
@Test
def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders)
}
@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest)
}
@@ -7254,7 +7254,7 @@ class KafkaApisTest extends Logging {
any[RequestContext]())).thenReturn(new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(request, RequestLocal.NoCaching)
@@ -7273,7 +7273,7 @@ class KafkaApisTest extends Logging {
when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](),
any[RequestContext]())).thenThrow(new RuntimeException("test"))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(request, RequestLocal.NoCaching)
@@ -7303,7 +7303,7 @@ class KafkaApisTest extends Logging {
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](),
any[RequestContext]()))
.thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData()))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
@@ -7320,7 +7320,7 @@ class KafkaApisTest extends Logging {
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](),
any[RequestContext]()))
.thenThrow(new RuntimeException("test"))
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[PushTelemetryResponse](request)
@@ -7342,7 +7342,7 @@ class KafkaApisTest extends Logging {
@Test
def testListClientMetricsResources(): Unit = {
val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
val resources = new mutable.HashSet[String]
resources.add("test1")
@@ -7359,7 +7359,7 @@ class KafkaApisTest extends Logging {
@Test
def testListClientMetricsResourcesEmptyResponse(): Unit = {
val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
val resources = new mutable.HashSet[String]
when(clientMetricsManager.listClientMetricsResources).thenReturn(resources.asJava)
@@ -7373,7 +7373,7 @@ class KafkaApisTest extends Logging {
@Test
def testListClientMetricsResourcesWithException(): Unit = {
val request = buildRequest(new
ListClientMetricsResourcesRequest.Builder(new
ListClientMetricsResourcesRequestData()).build())
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
when(clientMetricsManager.listClientMetricsResources).thenThrow(new
RuntimeException("test"))
kafkaApis = createKafkaApis(raftSupport = true)
@@ -7389,7 +7389,7 @@ class KafkaApisTest extends Logging {
val shareGroupHeartbeatRequest = new
ShareGroupHeartbeatRequestData().setGroupId("group")
val requestChannelRequest = buildRequest(new
ShareGroupHeartbeatRequest.Builder(shareGroupHeartbeatRequest, true).build())
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
kafkaApis.handle(requestChannelRequest, RequestLocal.NoCaching)
@@ -7410,7 +7410,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
shareGroupHeartbeatRequest
)).thenReturn(future)
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
@@ -7434,7 +7434,7 @@ class KafkaApisTest extends Logging {
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava)
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
authorizer = Some(authorizer),
@@ -7457,7 +7457,7 @@ class KafkaApisTest extends Logging {
requestChannelRequest.context,
shareGroupHeartbeatRequest
)).thenReturn(future)
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties =
Map(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
raftSupport = true
@@ -7539,7 +7539,7 @@ class KafkaApisTest extends Logging {
any[RequestContext],
any[util.List[String]]
)).thenReturn(future)
- metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 02cba057ba8..00d042a0b6b 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -32,7 +32,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage,
MetadataProvenance}
import org.apache.kafka.metadata.LeaderRecoveryState
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@@ -53,7 +53,7 @@ object MetadataCacheTest {
def cacheProvider(): util.stream.Stream[MetadataCache] =
util.stream.Stream.of[MetadataCache](
MetadataCache.zkMetadataCache(1, MetadataVersion.latestTesting()),
- MetadataCache.kRaftMetadataCache(1)
+ MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
)
def updateCache(cache: MetadataCache, request: UpdateMetadataRequest,
records: Seq[ApiMessage] = List()): Unit = {
@@ -647,7 +647,7 @@ class MetadataCacheTest {
@Test
def testIsBrokerFenced(): Unit = {
- val metadataCache = MetadataCache.kRaftMetadataCache(0)
+ val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
val delta = new MetadataDelta.Builder().build()
delta.replay(new RegisterBrokerRecord()
@@ -669,7 +669,7 @@ class MetadataCacheTest {
@Test
def testGetAliveBrokersWithBrokerFenced(): Unit = {
- val metadataCache = MetadataCache.kRaftMetadataCache(0)
+ val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
val listenerName = "listener"
val endpoints = new BrokerEndpointCollection()
endpoints.add(new BrokerEndpoint().
@@ -705,7 +705,7 @@ class MetadataCacheTest {
@Test
def testIsBrokerInControlledShutdown(): Unit = {
- val metadataCache = MetadataCache.kRaftMetadataCache(0)
+ val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
val delta = new MetadataDelta.Builder().build()
delta.replay(new RegisterBrokerRecord()
@@ -727,7 +727,7 @@ class MetadataCacheTest {
@Test
def testGetLiveBrokerEpoch(): Unit = {
- val metadataCache = MetadataCache.kRaftMetadataCache(0)
+ val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
val delta = new MetadataDelta.Builder().build()
delta.replay(new RegisterBrokerRecord()
@@ -748,7 +748,7 @@ class MetadataCacheTest {
@Test
def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
- val metadataCache = MetadataCache.kRaftMetadataCache(0)
+ val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
val controllerId = 2
val controllerEpoch = 1
@@ -1106,7 +1106,7 @@ class MetadataCacheTest {
new PartitionRecord().setTopicId(topicId).setPartitionId(partition.id).
setReplicas(partition.replicas).setDirectories(partition.dirs).
setLeader(partition.replicas.get(0)).setIsr(partition.replicas)))
- val cache = MetadataCache.kRaftMetadataCache(1)
+ val cache = MetadataCache.kRaftMetadataCache(1, () =>
KRaftVersion.KRAFT_VERSION_0)
cache.setImage(delta.apply(MetadataProvenance.EMPTY))
val topicMetadata = cache.getTopicMetadata(Set("foo"),
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).head
topicMetadata.partitions().asScala.map(p => (p.partitionIndex(),
p.offlineReplicas())).toMap
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index cf9ac1f812c..d3963cfaadb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -42,7 +42,7 @@ import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesVersion}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs,
ServerLogConfigs}
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.apache.kafka.server.util.{MockTime, ShutdownableThread}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation,
FetchParams, FetchPartitionData, LogConfig, LogDirFailureChannel}
import org.junit.jupiter.api.Assertions._
@@ -84,7 +84,7 @@ class ReplicaManagerConcurrencyTest extends Logging {
def testIsrExpandAndShrinkWithConcurrentProduce(): Unit = {
val localId = 0
val remoteId = 1
- val metadataCache = MetadataCache.kRaftMetadataCache(localId)
+ val metadataCache = MetadataCache.kRaftMetadataCache(localId, () =>
KRaftVersion.KRAFT_VERSION_0)
channel = new ControllerChannel
replicaManager = buildReplicaManager(localId, channel, metadataCache)
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 5657cd37ec4..c8088a1244e 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataImageTest,
import org.apache.kafka.image.loader.LogDeltaManifest
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
-import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.apache.kafka.server.fault.FaultHandler
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -181,7 +181,7 @@ class BrokerMetadataPublisherTest {
@Test
def testNewImagePushedToGroupCoordinator(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
- val metadataCache = new KRaftMetadataCache(0)
+ val metadataCache = new KRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_1)
val logManager = mock(classOf[LogManager])
val replicaManager = mock(classOf[ReplicaManager])
val groupCoordinator = mock(classOf[GroupCoordinator])
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 7c3f4aa2765..5ddb0399029 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -44,7 +44,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch}
import org.apache.kafka.raft.internals.VoterSetTest
-import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
import org.apache.kafka.server.config.ServerLogConfigs
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId,
RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate,
RemoteLogSegmentState, RemotePartitionDeleteMetadata,
RemotePartitionDeleteState}
@@ -553,7 +553,7 @@ class DumpLogSegmentsTest {
.setTime(new MockTime)
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new
OffsetAndEpoch(0, 0)).get)
- .setKraftVersion(1)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1,
2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
) { snapshotWriter =>
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 6b31224cf4c..28616f3828a 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -61,6 +61,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.KRaftConfigs;
@@ -107,7 +108,7 @@ public class KRaftMetadataRequestBenchmark {
private final Metrics metrics = new Metrics();
private final int brokerId = 1;
private final ForwardingManager forwardingManager =
Mockito.mock(ForwardingManager.class);
- private final KRaftMetadataCache metadataCache =
MetadataCache.kRaftMetadataCache(brokerId);
+ private final KRaftMetadataCache metadataCache =
MetadataCache.kRaftMetadataCache(brokerId, () -> KRaftVersion.KRAFT_VERSION_1);
private final ClientQuotaManager clientQuotaManager =
Mockito.mock(ClientQuotaManager.class);
private final ClientRequestQuotaManager clientRequestQuotaManager =
Mockito.mock(ClientRequestQuotaManager.class);
private final ControllerMutationQuotaManager
controllerMutationQuotaManager =
Mockito.mock(ControllerMutationQuotaManager.class);
diff --git
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index bcf603b577f..3c07b1447ef 100644
---
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.junit.jupiter.api.Test;
@@ -124,6 +125,11 @@ public class SnapshotEmitterTest {
return 0;
}
+ @Override
+ public KRaftVersion kraftVersion() {
+ return KRaftVersion.KRAFT_VERSION_0;
+ }
+
@Override
public void close() throws Exception {
// nothing to do
diff --git
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 97d56da9e21..2cfaa83d357 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -33,6 +33,7 @@ import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.snapshot.MockRawSnapshotReader;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
@@ -881,4 +882,9 @@ public final class LocalLogManager implements
RaftClient<ApiMessageAndVersion>,
public void resignAfterNonAtomicCommit() {
resignAfterNonAtomicCommit.set(true);
}
+
+ @Override
+ public KRaftVersion kraftVersion() {
+ return KRaftVersion.KRAFT_VERSION_0;
+ }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java
b/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java
index 8633d1eda39..50b519bf3b1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FileQuorumStateStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.raft.generated.QuorumStateDataJsonConverter;
+import org.apache.kafka.server.common.KRaftVersion;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -138,8 +139,8 @@ public class FileQuorumStateStore implements
QuorumStateStore {
}
@Override
- public void writeElectionState(ElectionState latest, short kraftVersion) {
- short quorumStateVersion =
quorumStateVersionFromKRaftVersion(kraftVersion);
+ public void writeElectionState(ElectionState latest, KRaftVersion
kraftVersion) {
+ short quorumStateVersion = kraftVersion.quorumStateVersion();
writeElectionStateToFile(
stateFile,
@@ -153,18 +154,6 @@ public class FileQuorumStateStore implements
QuorumStateStore {
return stateFile.toPath();
}
- private short quorumStateVersionFromKRaftVersion(short kraftVersion) {
- if (kraftVersion == 0) {
- return 0;
- } else if (kraftVersion == 1) {
- return 1;
- } else {
- throw new IllegalArgumentException(
- String.format("Unknown kraft.version %d", kraftVersion)
- );
- }
- }
-
private void writeElectionStateToFile(final File stateFile,
QuorumStateData state, short version) {
if (version > HIGHEST_SUPPORTED_VERSION) {
throw new IllegalArgumentException(
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 0f632a29021..17bf64f4666 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -69,6 +69,7 @@ import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.NotifyingRawSnapshotWriter;
import org.apache.kafka.snapshot.RawSnapshotReader;
@@ -3044,6 +3045,11 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return log.endOffset().offset();
}
+ @Override
+ public KRaftVersion kraftVersion() {
+ return partitionState.lastKraftVersion();
+ }
+
@Override
public void close() {
log.flush(true);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 59c17cbcc9e..9cdfbbf3d84 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.server.common.KRaftVersion;
import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class LeaderState<T> implements EpochState {
private final VoterSet voterSetAtEpochStart;
// This field is non-empty if the voter set at epoch start came from a
snapshot or log segment
private final OptionalLong offsetOfVotersAtEpochStart;
- private final short kraftVersionAtEpochStart;
+ private final KRaftVersion kraftVersionAtEpochStart;
private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
private Map<Integer, ReplicaState> voterStates = new HashMap<>();
@@ -90,7 +91,7 @@ public class LeaderState<T> implements EpochState {
long epochStartOffset,
VoterSet voterSetAtEpochStart,
OptionalLong offsetOfVotersAtEpochStart,
- short kraftVersionAtEpochStart,
+ KRaftVersion kraftVersionAtEpochStart,
Set<Integer> grantingVoters,
BatchAccumulator<T> accumulator,
Endpoints endpoints,
@@ -239,7 +240,7 @@ public class LeaderState<T> implements EpochState {
if (offset == -1) {
// Latest voter set came from the bootstrap checkpoint
(0-0.checkpoint)
// rewrite the voter set to the log so that it is
replcated to the replicas.
- if (kraftVersionAtEpochStart < 1) {
+ if (!kraftVersionAtEpochStart.isReconfigSupported()) {
throw new IllegalStateException(
String.format(
"The bootstrap checkpoint contains a set
of voters %s at %s " +
@@ -254,7 +255,7 @@ public class LeaderState<T> implements EpochState {
currentTimeMs,
new KRaftVersionRecord()
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
- .setKRaftVersion(kraftVersionAtEpochStart)
+
.setKRaftVersion(kraftVersionAtEpochStart.featureLevel())
);
builder.appendVotersMessage(
currentTimeMs,
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumStateStore.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumStateStore.java
index e3d252e9aa0..d5d21cabfdd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumStateStore.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.raft;
+import org.apache.kafka.server.common.KRaftVersion;
+
import java.nio.file.Path;
import java.util.Optional;
@@ -39,7 +41,7 @@ public interface QuorumStateStore {
* @param latest the latest election state
* @param kraftVersion the finalized kraft.version
*/
- void writeElectionState(ElectionState latest, short kraftVersion);
+ void writeElectionState(ElectionState latest, KRaftVersion kraftVersion);
/**
* Path to the quorum state store
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 13006b7624a..c09297a7049 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.SnapshotWriter;
@@ -250,4 +251,11 @@ public interface RaftClient<T> extends AutoCloseable {
* or 0 if there have not been any records written.
*/
long logEndOffset();
+
+ /**
+ * Returns the latest kraft.version, even if it hasn't been committed
durably to Raft.
+ *
+ * @return the current kraft.version.
+ */
+ KRaftVersion kraftVersion();
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index b088a63c381..59663b54204 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -25,6 +25,7 @@ import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LogFetchInfo;
import org.apache.kafka.raft.ReplicatedLog;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
@@ -56,7 +57,7 @@ public final class KRaftControlRecordStateMachine {
// are the KRaft driver when calling updateState and the RaftClient
callers when freezing
// snapshots
private final VoterSetHistory voterSetHistory;
- private final LogHistory<Short> kraftVersionHistory = new
TreeMapLogHistory<>();
+ private final LogHistory<KRaftVersion> kraftVersionHistory = new
TreeMapLogHistory<>();
// This synchronization is enough because
// 1. The write operation updateState only sets the value without reading
it and updates to
@@ -149,9 +150,10 @@ public final class KRaftControlRecordStateMachine {
/**
* Returns the last kraft version.
*/
- public short lastKraftVersion() {
+ public KRaftVersion lastKraftVersion() {
synchronized (kraftVersionHistory) {
- return
kraftVersionHistory.lastEntry().map(LogHistory.Entry::value).orElse((short) 0);
+ return
kraftVersionHistory.lastEntry().map(LogHistory.Entry::value).
+ orElse(KRaftVersion.KRAFT_VERSION_0);
}
}
@@ -175,11 +177,12 @@ public final class KRaftControlRecordStateMachine {
* @param offset the offset (inclusive)
* @return the finalized kraft version if one exist, otherwise 0
*/
- public short kraftVersionAtOffset(long offset) {
+ public KRaftVersion kraftVersionAtOffset(long offset) {
checkOffsetIsValid(offset);
synchronized (kraftVersionHistory) {
- return kraftVersionHistory.valueAtOrBefore(offset).orElse((short)
0);
+ return kraftVersionHistory.valueAtOrBefore(offset).
+ orElse(KRaftVersion.KRAFT_VERSION_0);
}
}
@@ -267,7 +270,12 @@ public final class KRaftControlRecordStateMachine {
case KRAFT_VERSION:
synchronized (kraftVersionHistory) {
- kraftVersionHistory.addAt(currentOffset,
((KRaftVersionRecord) record.message()).kRaftVersion());
+ kraftVersionHistory.addAt(
+ currentOffset,
+ KRaftVersion.fromFeatureLevel(
+ ((KRaftVersionRecord)
record.message()).kRaftVersion()
+ )
+ );
}
break;
diff --git
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index fde70c11c44..375fd6b494f 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -31,6 +31,7 @@ import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import java.util.List;
@@ -146,7 +147,7 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
private Time time = Time.SYSTEM;
private int maxBatchSize = 1024;
private MemoryPool memoryPool = MemoryPool.NONE;
- private short kraftVersion = 1;
+ private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
private Optional<VoterSet> voterSet = Optional.empty();
private Optional<RawSnapshotWriter> rawSnapshotWriter =
Optional.empty();
@@ -180,7 +181,7 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
return this;
}
- public Builder setKraftVersion(short kraftVersion) {
+ public Builder setKraftVersion(KRaftVersion kraftVersion) {
this.kraftVersion = kraftVersion;
return this;
}
@@ -197,7 +198,7 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
throw new IllegalStateException(
String.format("Initializing writer with a non-empty
snapshot: %s", rawSnapshotWriter.get().snapshotId())
);
- } else if (kraftVersion == 0 && voterSet.isPresent()) {
+ } else if (kraftVersion == KRaftVersion.KRAFT_VERSION_0 &&
voterSet.isPresent()) {
throw new IllegalStateException(
String.format("Voter set (%s) not expected when the
kraft.version is 0", voterSet.get())
);
@@ -237,12 +238,12 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
);
- if (kraftVersion > 0) {
+ if (kraftVersion.isReconfigSupported()) {
builder.appendKRaftVersionMessage(
now,
new KRaftVersionRecord()
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
- .setKRaftVersion(kraftVersion)
+ .setKRaftVersion(kraftVersion.featureLevel())
);
if (voterSet.isPresent()) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java
b/raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java
index 15146a932a3..1cf480e5ebf 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileQuorumStateStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
import com.fasterxml.jackson.databind.JsonNode;
@@ -28,7 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.BufferedWriter;
import java.io.File;
@@ -41,6 +42,7 @@ import java.util.Collections;
import java.util.Optional;
import java.util.Set;
+import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -48,8 +50,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class FileQuorumStateStoreTest {
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- void testWriteReadElectedLeader(short kraftVersion) throws IOException {
+ @EnumSource(value = KRaftVersion.class)
+ void testWriteReadElectedLeader(KRaftVersion kraftVersion) throws
IOException {
FileQuorumStateStore stateStore = new
FileQuorumStateStore(TestUtils.tempFile());
final int epoch = 2;
@@ -64,7 +66,7 @@ public class FileQuorumStateStoreTest {
);
final Optional<ElectionState> expected;
- if (kraftVersion == 1) {
+ if (kraftVersion.isReconfigSupported()) {
expected = Optional.of(
ElectionState.withElectedLeader(epoch, voter1,
Collections.emptySet())
);
@@ -78,8 +80,8 @@ public class FileQuorumStateStoreTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- void testWriteReadVotedCandidate(short kraftVersion) throws IOException {
+ @EnumSource(value = KRaftVersion.class)
+ void testWriteReadVotedCandidate(KRaftVersion kraftVersion) throws
IOException {
FileQuorumStateStore stateStore = new
FileQuorumStateStore(TestUtils.tempFile());
final int epoch = 2;
@@ -95,7 +97,7 @@ public class FileQuorumStateStoreTest {
);
final Optional<ElectionState> expected;
- if (kraftVersion == 1) {
+ if (kraftVersion.isReconfigSupported()) {
expected = Optional.of(
ElectionState.withVotedCandidate(
epoch,
@@ -118,8 +120,8 @@ public class FileQuorumStateStoreTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- void testWriteReadUnknownLeader(short kraftVersion) throws IOException {
+ @EnumSource(value = KRaftVersion.class)
+ void testWriteReadUnknownLeader(KRaftVersion kraftVersion) throws
IOException {
FileQuorumStateStore stateStore = new
FileQuorumStateStore(TestUtils.tempFile());
final int epoch = 2;
@@ -131,7 +133,7 @@ public class FileQuorumStateStoreTest {
);
final Optional<ElectionState> expected;
- if (kraftVersion == 1) {
+ if (kraftVersion.isReconfigSupported()) {
expected = Optional.of(ElectionState.withUnknownLeader(epoch,
Collections.emptySet()));
} else {
expected = Optional.of(ElectionState.withUnknownLeader(epoch,
voters));
@@ -149,7 +151,7 @@ public class FileQuorumStateStoreTest {
final int epoch = 2;
Set<Integer> voters = Utils.mkSet(1, 2, 3);
- stateStore.writeElectionState(ElectionState.withUnknownLeader(epoch,
voters), (short) 1);
+ stateStore.writeElectionState(ElectionState.withUnknownLeader(epoch,
voters), KRAFT_VERSION_1);
// Check that state is persisted
FileQuorumStateStore reloadedStore = new
FileQuorumStateStore(stateFile);
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 0d0849d1aa0..a423d416c8b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.KRaftVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -59,7 +60,7 @@ public class LeaderStateTest {
private final int fetchTimeoutMs = 2000;
private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs *
CHECK_QUORUM_TIMEOUT_FACTOR);
private final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
- private final short kraftVersion = 1;
+ private final KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
private LeaderState<?> newLeaderState(
VoterSet voters,
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockQuorumStateStore.java
b/raft/src/test/java/org/apache/kafka/raft/MockQuorumStateStore.java
index b164fc2bf8e..fd6e4ebd496 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockQuorumStateStore.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockQuorumStateStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.raft.generated.QuorumStateData;
+import org.apache.kafka.server.common.KRaftVersion;
import java.nio.file.FileSystems;
import java.nio.file.Path;
@@ -31,9 +32,9 @@ public class MockQuorumStateStore implements QuorumStateStore
{
}
@Override
- public void writeElectionState(ElectionState update, short kraftVersion) {
+ public void writeElectionState(ElectionState update, KRaftVersion
kraftVersion) {
current = Optional.of(
-
update.toQuorumStateData(quorumStateVersionFromKRaftVersion(kraftVersion))
+ update.toQuorumStateData(kraftVersion.quorumStateVersion())
);
}
@@ -46,16 +47,4 @@ public class MockQuorumStateStore implements
QuorumStateStore {
public void clear() {
current = Optional.empty();
}
-
- private short quorumStateVersionFromKRaftVersion(short kraftVersion) {
- if (kraftVersion == 0) {
- return 0;
- } else if (kraftVersion == 1) {
- return 1;
- } else {
- throw new IllegalArgumentException(
- String.format("Unknown kraft.version %d", kraftVersion)
- );
- }
- }
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index ba1b179d3b5..236b858a046 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -25,9 +25,10 @@ import
org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.KRaftVersion;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import java.io.UncheckedIOException;
@@ -60,7 +61,7 @@ public class QuorumStateTest {
private QuorumState buildQuorumState(
OptionalInt localId,
VoterSet voterSet,
- short kraftVersion
+ KRaftVersion kraftVersion
) {
KRaftControlRecordStateMachine mockPartitionState =
Mockito.mock(KRaftControlRecordStateMachine.class);
@@ -69,7 +70,7 @@ public class QuorumStateTest {
.thenReturn(voterSet);
Mockito
.when(mockPartitionState.lastVoterSetOffset())
- .thenReturn(kraftVersion == 0 ? OptionalLong.empty() :
OptionalLong.of(0));
+ .thenReturn(kraftVersion.isReconfigSupported() ?
OptionalLong.of(0) : OptionalLong.empty());
Mockito
.when(mockPartitionState.lastKraftVersion())
.thenReturn(kraftVersion);
@@ -88,23 +89,23 @@ public class QuorumStateTest {
);
}
- private QuorumState initializeEmptyState(VoterSet voters, short
kraftVersion) {
+ private QuorumState initializeEmptyState(VoterSet voters, KRaftVersion
kraftVersion) {
QuorumState state = buildQuorumState(OptionalInt.of(localId), voters,
kraftVersion);
store.writeElectionState(ElectionState.withUnknownLeader(0,
voters.voterIds()), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
return state;
}
- private Set<Integer> persistedVoters(Set<Integer> voters, short
kraftVersion) {
- if (kraftVersion == 1) {
+ private Set<Integer> persistedVoters(Set<Integer> voters, KRaftVersion
kraftVersion) {
+ if (kraftVersion.featureLevel() == 1) {
return Collections.emptySet();
}
return voters;
}
- private ReplicaKey persistedVotedKey(ReplicaKey replicaKey, short
kraftVersion) {
- if (kraftVersion == 1) {
+ private ReplicaKey persistedVotedKey(ReplicaKey replicaKey, KRaftVersion
kraftVersion) {
+ if (kraftVersion.featureLevel() == 1) {
return replicaKey;
}
@@ -117,8 +118,8 @@ public class QuorumStateTest {
);
}
- private VoterSet localWithRemoteVoterSet(IntStream remoteIds, short
kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ private VoterSet localWithRemoteVoterSet(IntStream remoteIds, KRaftVersion
kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
Stream<ReplicaKey> remoteKeys = remoteIds
.boxed()
.map(id -> replicaKey(id, withDirectoryId));
@@ -126,8 +127,8 @@ public class QuorumStateTest {
return localWithRemoteVoterSet(remoteKeys, kraftVersion);
}
- private VoterSet localWithRemoteVoterSet(Stream<ReplicaKey> remoteKeys,
short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ private VoterSet localWithRemoteVoterSet(Stream<ReplicaKey> remoteKeys,
KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
ReplicaKey actualLocalVoter = withDirectoryId ?
localVoterKey :
@@ -144,8 +145,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializePrimordialEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializePrimordialEpoch(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
@@ -159,8 +160,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsUnattached(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsUnattached(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
int epoch = 5;
@@ -183,8 +184,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsFollower(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsFollower(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
int epoch = 5;
@@ -203,8 +204,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsUnattachedWhenMissingEndpoints(short
kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsUnattachedWhenMissingEndpoints(KRaftVersion
kraftVersion) {
int node1 = 1;
int node2 = 2;
int leader = 3;
@@ -222,8 +223,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsVoted(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsVoted(KRaftVersion kraftVersion) {
ReplicaKey nodeKey1 = ReplicaKey.of(1, Uuid.randomUuid());
ReplicaKey nodeKey2 = ReplicaKey.of(2, Uuid.randomUuid());
@@ -253,9 +254,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsResignedCandidate(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsResignedCandidate(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
ReplicaKey node1 = replicaKey(1, withDirectoryId);
ReplicaKey node2 = replicaKey(2, withDirectoryId);
int epoch = 5;
@@ -291,8 +292,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeAsResignedLeader(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeAsResignedLeader(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
int epoch = 5;
@@ -322,8 +323,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToCandidate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToCandidate(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -366,8 +367,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToResigned(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToResigned(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -386,8 +387,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToLeader(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToLeader(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
@@ -404,8 +405,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToLeaderWithoutGrantedVote(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToLeaderWithoutGrantedVote(KRaftVersion
kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -420,8 +421,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToFollower(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToFollower(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
@@ -445,8 +446,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToUnattached(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToUnattached(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -468,8 +469,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToVoted(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToVoted(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -495,8 +496,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCandidateToAnyStateLowerEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCandidateToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -527,8 +528,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToLeader(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToLeader(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
@@ -545,8 +546,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToResigned(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToResigned(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
@@ -569,8 +570,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToCandidate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToCandidate(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
@@ -587,8 +588,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToFollower(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToFollower(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
@@ -614,8 +615,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToUnattached(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToUnattached(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -638,8 +639,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToVoted(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToVoted(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -668,8 +669,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testLeaderToAnyStateLowerEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testLeaderToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -702,8 +703,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCannotFollowOrVoteForSelf(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCannotFollowOrVoteForSelf(KRaftVersion kraftVersion) {
VoterSet voters = localStandaloneVoterSet();
assertEquals(Optional.empty(), store.readElectionState());
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -720,8 +721,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToLeaderOrResigned(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToLeaderOrResigned(KRaftVersion kraftVersion) {
ReplicaKey leaderKey = ReplicaKey.of(1, Uuid.randomUuid());
int epoch = 5;
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
leaderKey));
@@ -737,8 +738,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToVotedSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToVotedSameEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -770,8 +771,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToVotedHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToVotedHigherEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -796,8 +797,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToCandidate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToCandidate(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -816,8 +817,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToUnattached(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToUnattached(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -837,8 +838,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToFollowerSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToFollowerSameEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -861,8 +862,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToFollowerHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToFollowerHigherEpoch(KRaftVersion kraftVersion)
{
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -885,8 +886,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testUnattachedToAnyStateLowerEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testUnattachedToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -915,8 +916,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToInvalidLeaderOrResigned(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToInvalidLeaderOrResigned(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -928,8 +929,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToCandidate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToCandidate(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -948,8 +949,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToVotedSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToVotedSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -968,8 +969,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToFollowerSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToFollowerSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1001,8 +1002,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToFollowerHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToFollowerHigherEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1034,8 +1035,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToUnattachedSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToUnattachedSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1046,8 +1047,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToUnattachedHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToUnattachedHigherEpoch(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -1067,8 +1068,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testVotedToAnyStateLowerEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testVotedToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -1098,8 +1099,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToFollowerSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToFollowerSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1146,8 +1147,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToFollowerHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToFollowerHigherEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1183,8 +1184,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToLeaderOrResigned(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToLeaderOrResigned(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1200,8 +1201,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToCandidate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToCandidate(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1224,8 +1225,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToUnattachedSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToUnattachedSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1240,8 +1241,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToUnattachedHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToUnattachedHigherEpoch(KRaftVersion kraftVersion)
{
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1264,8 +1265,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToVotedSameEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToVotedSameEpoch(KRaftVersion kraftVersion) {
int node1 = 1;
int node2 = 2;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
@@ -1292,8 +1293,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToVotedHigherEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToVotedHigherEpoch(KRaftVersion kraftVersion) {
ReplicaKey nodeKey1 = ReplicaKey.of(1, Uuid.randomUuid());
ReplicaKey nodeKey2 = ReplicaKey.of(2, Uuid.randomUuid());
@@ -1321,8 +1322,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testFollowerToAnyStateLowerEpoch(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
int otherNodeId = 1;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -1359,8 +1360,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testCanBecomeFollowerOfNonVoter(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testCanBecomeFollowerOfNonVoter(KRaftVersion kraftVersion) {
int otherNodeId = 1;
ReplicaKey nonVoterKey = ReplicaKey.of(2, Uuid.randomUuid());
VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId),
kraftVersion);
@@ -1393,9 +1394,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testObserverCannotBecomeCandidateOrLeader(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void testObserverCannotBecomeCandidateOrLeader(KRaftVersion
kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
int otherNodeId = 1;
VoterSet voters = VoterSetTest.voterSet(
VoterSetTest.voterMap(IntStream.of(otherNodeId), withDirectoryId)
@@ -1408,8 +1409,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testObserverWithIdCanVote(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testObserverWithIdCanVote(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(otherNodeKey));
@@ -1426,9 +1427,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testObserverFollowerToUnattached(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void testObserverFollowerToUnattached(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
int node1 = 1;
int node2 = 2;
VoterSet voters = VoterSetTest.voterSet(
@@ -1453,9 +1454,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testObserverUnattachedToFollower(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void testObserverUnattachedToFollower(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
int node1 = 1;
int node2 = 2;
VoterSet voters = VoterSetTest.voterSet(
@@ -1478,8 +1479,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeWithCorruptedStore(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeWithCorruptedStore(KRaftVersion kraftVersion) {
QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class);
Mockito.doThrow(UncheckedIOException.class).when(stateStore).readElectionState();
@@ -1497,8 +1498,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testHasRemoteLeader(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testHasRemoteLeader(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
@@ -1527,8 +1528,8 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testHighWatermarkRetained(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void testHighWatermarkRetained(KRaftVersion kraftVersion) {
ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey,
otherNodeKey));
@@ -1563,9 +1564,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void testInitializeWithEmptyLocalId(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void testInitializeWithEmptyLocalId(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
VoterSet voters = VoterSetTest.voterSet(
VoterSetTest.voterMap(IntStream.of(0, 1), withDirectoryId)
);
@@ -1590,9 +1591,9 @@ public class QuorumStateTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void
testNoLocalIdInitializationFailsIfElectionStateHasVotedCandidate(short
kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void
testNoLocalIdInitializationFailsIfElectionStateHasVotedCandidate(KRaftVersion
kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
int epoch = 5;
int votedId = 1;
VoterSet voters = VoterSetTest.voterSet(
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 600130b41ab..60147c07888 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -57,6 +57,7 @@ import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.ReplicaKey;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotReader;
@@ -115,7 +116,7 @@ public final class RaftClientTestContext {
final Uuid clusterId;
private final OptionalInt localId;
public final Uuid localDirectoryId;
- public final short kraftVersion;
+ public final KRaftVersion kraftVersion;
public final KafkaRaftClient<String> client;
final Metrics metrics;
public final MockLog log;
@@ -151,6 +152,7 @@ public final class RaftClientTestContext {
private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID, logContext);
private final Uuid clusterId = Uuid.randomUuid();
private final OptionalInt localId;
+ private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_0;
private final Uuid localDirectoryId;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
@@ -159,7 +161,6 @@ public final class RaftClientTestContext {
private MemoryPool memoryPool = MemoryPool.NONE;
private List<InetSocketAddress> bootstrapServers =
Collections.emptyList();
private boolean kip853Rpc = false;
- private short kraftVersion = 0;
private Optional<VoterSet> startingVoters = Optional.empty();
private boolean isStartingVotersStatic = false;
@@ -253,7 +254,7 @@ public final class RaftClientTestContext {
Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
try (RecordsSnapshotWriter<?> snapshot = new
RecordsSnapshotWriter.Builder()
.setTime(time)
- .setKraftVersion((short) 0)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
.build(SERDE)
) {
@@ -312,7 +313,7 @@ public final class RaftClientTestContext {
Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
if (voters.isPresent()) {
- kraftVersion = 1;
+ kraftVersion = KRaftVersion.KRAFT_VERSION_1;
startingVoters = voters;
isStartingVotersStatic = false;
@@ -329,7 +330,7 @@ public final class RaftClientTestContext {
}
} else {
// Create an empty bootstrap snapshot if there is no voter set
- kraftVersion = 0;
+ kraftVersion = KRaftVersion.KRAFT_VERSION_0;
withEmptySnapshot(Snapshots.BOOTSTRAP_SNAPSHOT_ID);
}
@@ -431,7 +432,7 @@ public final class RaftClientTestContext {
Uuid clusterId,
OptionalInt localId,
Uuid localDirectoryId,
- short kraftVersion,
+ KRaftVersion kraftVersion,
KafkaRaftClient<String> client,
MockLog log,
MockNetworkChannel channel,
@@ -600,7 +601,8 @@ public final class RaftClientTestContext {
}
public void assertElectedLeader(int epoch, int leaderId) {
- Set<Integer> voters = kraftVersion == 0 ? startingVoters.voterIds() :
Collections.emptySet();
+ Set<Integer> voters = kraftVersion.isReconfigSupported() ?
+ Collections.emptySet() : startingVoters.voterIds();
assertEquals(
ElectionState.withElectedLeader(epoch, leaderId, voters),
quorumStateStore.readElectionState().get()
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
index 39b5ce3e589..93cd37fb250 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.MockLog;
import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
@@ -75,14 +76,14 @@ final class KRaftControlRecordStateMachineTest {
KRaftControlRecordStateMachine partitionState =
buildPartitionListener(log, Optional.of(staticVoterSet));
// Append the kraft.version control record
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
log.appendAsLeader(
MemoryRecords.withKRaftVersionRecord(
log.endOffset().offset(),
0,
epoch,
bufferSupplier.get(300),
- new KRaftVersionRecord().setKRaftVersion(kraftVersion)
+ new
KRaftVersionRecord().setKRaftVersion(kraftVersion.featureLevel())
),
epoch
);
@@ -126,14 +127,14 @@ final class KRaftControlRecordStateMachineTest {
log.truncateToLatestSnapshot();
// Append the kraft.version control record
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
log.appendAsLeader(
MemoryRecords.withKRaftVersionRecord(
log.endOffset().offset(),
0,
epoch,
bufferSupplier.get(300),
- new KRaftVersionRecord().setKRaftVersion(kraftVersion)
+ new
KRaftVersionRecord().setKRaftVersion(kraftVersion.featureLevel())
),
epoch
);
@@ -168,7 +169,7 @@ final class KRaftControlRecordStateMachineTest {
KRaftControlRecordStateMachine partitionState =
buildPartitionListener(log, Optional.of(staticVoterSet));
// Create a snapshot that has kraft.version and voter set control
records
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
VoterSet voterSet =
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
@@ -198,7 +199,7 @@ final class KRaftControlRecordStateMachineTest {
KRaftControlRecordStateMachine partitionState =
buildPartitionListener(log, Optional.of(staticVoterSet));
// Create a snapshot that has kraft.version and voter set control
records
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
VoterSet snapshotVoterSet =
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
OffsetAndEpoch snapshotId = new OffsetAndEpoch(10, epoch);
@@ -245,14 +246,14 @@ final class KRaftControlRecordStateMachineTest {
KRaftControlRecordStateMachine partitionState =
buildPartitionListener(log, Optional.of(staticVoterSet));
// Append the kraft.version control record
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
log.appendAsLeader(
MemoryRecords.withKRaftVersionRecord(
log.endOffset().offset(),
0,
epoch,
bufferSupplier.get(300),
- new KRaftVersionRecord().setKRaftVersion(kraftVersion)
+ new
KRaftVersionRecord().setKRaftVersion(kraftVersion.featureLevel())
),
epoch
);
@@ -314,14 +315,14 @@ final class KRaftControlRecordStateMachineTest {
// Append the kraft.version control record
long kraftVersionOffset = log.endOffset().offset();
- short kraftVersion = 1;
+ KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
log.appendAsLeader(
MemoryRecords.withKRaftVersionRecord(
kraftVersionOffset,
0,
epoch,
bufferSupplier.get(300),
- new KRaftVersionRecord().setKRaftVersion(kraftVersion)
+ new
KRaftVersionRecord().setKRaftVersion(kraftVersion.featureLevel())
),
epoch
);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index 4d9ef1c2574..2085c442087 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -26,10 +26,11 @@ import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.MockQuorumStateStore;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumState;
+import org.apache.kafka.server.common.KRaftVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import java.util.Collections;
@@ -63,7 +64,7 @@ public class KafkaRaftMetricsTest {
metrics.close();
}
- private QuorumState buildQuorumState(VoterSet voterSet, short
kraftVersion) {
+ private QuorumState buildQuorumState(VoterSet voterSet, KRaftVersion
kraftVersion) {
KRaftControlRecordStateMachine mockPartitionState =
Mockito.mock(KRaftControlRecordStateMachine.class);
Mockito
@@ -71,7 +72,7 @@ public class KafkaRaftMetricsTest {
.thenReturn(voterSet);
Mockito
.when(mockPartitionState.lastVoterSetOffset())
- .thenReturn(kraftVersion == 0 ? OptionalLong.empty() :
OptionalLong.of(0));
+ .thenReturn(kraftVersion.isReconfigSupported() ?
OptionalLong.of(0) : OptionalLong.empty());
Mockito
.when(mockPartitionState.lastKraftVersion())
.thenReturn(kraftVersion);
@@ -90,8 +91,8 @@ public class KafkaRaftMetricsTest {
);
}
- private VoterSet localStandaloneVoterSet(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ private VoterSet localStandaloneVoterSet(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
return VoterSetTest.voterSet(
Collections.singletonMap(
localId,
@@ -106,9 +107,9 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordVoterQuorumState(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordVoterQuorumState(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
Map<Integer, VoterSet.VoterNode> voterMap =
VoterSetTest.voterMap(IntStream.of(1, 2), withDirectoryId);
voterMap.put(
localId,
@@ -204,9 +205,9 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordNonVoterQuorumState(short kraftVersion) {
- boolean withDirectoryId = kraftVersion > 0;
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordNonVoterQuorumState(KRaftVersion kraftVersion) {
+ boolean withDirectoryId = kraftVersion.featureLevel() > 0;
VoterSet voters = VoterSetTest.voterSet(
VoterSetTest.voterMap(IntStream.of(1, 2, 3), withDirectoryId)
);
@@ -251,8 +252,8 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordLogEnd(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordLogEnd(KRaftVersion kraftVersion) {
QuorumState state =
buildQuorumState(localStandaloneVoterSet(kraftVersion), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@@ -267,8 +268,8 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordNumUnknownVoterConnections(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordNumUnknownVoterConnections(KRaftVersion
kraftVersion) {
QuorumState state =
buildQuorumState(localStandaloneVoterSet(kraftVersion), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@@ -281,8 +282,8 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordPollIdleRatio(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordPollIdleRatio(KRaftVersion kraftVersion) {
QuorumState state =
buildQuorumState(localStandaloneVoterSet(kraftVersion), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@@ -354,8 +355,8 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordLatency(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordLatency(KRaftVersion kraftVersion) {
QuorumState state =
buildQuorumState(localStandaloneVoterSet(kraftVersion), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
@@ -386,8 +387,8 @@ public class KafkaRaftMetricsTest {
}
@ParameterizedTest
- @ValueSource(shorts = {0, 1})
- public void shouldRecordRate(short kraftVersion) {
+ @EnumSource(value = KRaftVersion.class)
+ public void shouldRecordRate(KRaftVersion kraftVersion) {
QuorumState state =
buildQuorumState(localStandaloneVoterSet(kraftVersion), kraftVersion);
state.initialize(new OffsetAndEpoch(0L, 0));
raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index cee5d68bc99..3e990037bb8 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.ControlRecord;
import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.MockRawSnapshotWriter;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
@@ -162,7 +163,7 @@ public final class RecordsIteratorTest {
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
.setTime(new MockTime())
- .setKraftVersion((short) 0)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.empty())
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
snapshotBuf -> buffer.set(snapshotBuf))
@@ -212,7 +213,7 @@ public final class RecordsIteratorTest {
);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
.setTime(new MockTime())
- .setKraftVersion((short) 1)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(voterSet))
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
snapshotBuf -> buffer.set(snapshotBuf))
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
index ea25ce1d120..60752ccdbbd 100644
---
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
+++
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.raft.internals.VoterSet;
import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.junit.jupiter.api.Test;
@@ -51,7 +52,7 @@ final class RecordsSnapshotWriterTest {
int maxBatchSize = 1024;
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
- .setKraftVersion((short) 0)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.empty())
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
@@ -103,7 +104,7 @@ final class RecordsSnapshotWriterTest {
);
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
- .setKraftVersion((short) 0)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.of(voterSet))
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
@@ -123,7 +124,7 @@ final class RecordsSnapshotWriterTest {
);
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
- .setKraftVersion((short) 1)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(voterSet))
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
@@ -180,7 +181,7 @@ final class RecordsSnapshotWriterTest {
int maxBatchSize = 1024;
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder()
- .setKraftVersion((short) 1)
+ .setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.empty())
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
index a4f11b10ace..271bf6ae59b 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
@@ -39,7 +39,8 @@ public enum Features {
*
* See {@link TestFeatureVersion} as an example. See {@link
FeatureVersion} when implementing a new feature.
*/
- TEST_VERSION("test.feature.version", TestFeatureVersion.values());
+ TEST_VERSION("test.feature.version", TestFeatureVersion.values()),
+ KRAFT_VERSION("kraft.version", KRaftVersion.values());
public static final Features[] FEATURES;
public static final List<Features> PRODUCTION_FEATURES;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
new file mode 100644
index 00000000000..975ef666072
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum KRaftVersion implements FeatureVersion {
+ // Version 0 is the initial version of KRaft.
+ KRAFT_VERSION_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION),
+
+ // Version 1 enables KIP-853.
+ KRAFT_VERSION_1(1, MetadataVersion.IBP_3_9_IV0);
+
+ public static final String FEATURE_NAME = "kraft.version";
+
+ private final short featureLevel;
+ private final MetadataVersion bootstrapMetadataVersion;
+
+ KRaftVersion(
+ int featureLevel,
+ MetadataVersion bootstrapMetadataVersion
+ ) {
+ this.featureLevel = (short) featureLevel;
+ this.bootstrapMetadataVersion = bootstrapMetadataVersion;
+ }
+
+ @Override
+ public short featureLevel() {
+ return featureLevel;
+ }
+
+ public static KRaftVersion fromFeatureLevel(short version) {
+ switch (version) {
+ case 0:
+ return KRAFT_VERSION_0;
+ case 1:
+ return KRAFT_VERSION_1;
+ default:
+ throw new RuntimeException("Unknown KRaft feature level: " +
(int) version);
+ }
+ }
+
+ public boolean isReconfigSupported() {
+ return this != KRAFT_VERSION_0;
+ }
+
+ @Override
+ public String featureName() {
+ return FEATURE_NAME;
+ }
+
+ @Override
+ public MetadataVersion bootstrapMetadataVersion() {
+ return bootstrapMetadataVersion;
+ }
+
+ @Override
+ public Map<String, Short> dependencies() {
+ return Collections.emptyMap();
+ }
+
+ public short quorumStateVersion() {
+ switch (this) {
+ case KRAFT_VERSION_0:
+ return (short) 0;
+ case KRAFT_VERSION_1:
+ return (short) 1;
+ }
+ throw new RuntimeException("Unknown KRaft feature level: " + this);
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
index daed7bbc7ea..5d696a36482 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java
@@ -20,7 +20,7 @@ import java.util.Collections;
import java.util.Map;
public enum TestFeatureVersion implements FeatureVersion {
-
+ TEST_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
// TEST_1 released right before MV 3.7-IVO was released, and it has no
dependencies
TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
// TEST_2 released right before MV 3.9-IVO was released, and it depends on
this metadata version
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
index e52ae360cd0..3adf6a33986 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java
@@ -26,8 +26,16 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class FeaturesTest {
+ @ParameterizedTest
+ @EnumSource(Features.class)
+ public void testV0SupportedInEarliestMV(Features feature) {
+ assertTrue(feature.featureVersions().length >= 1);
+ assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+ feature.featureVersions()[0].bootstrapMetadataVersion());
+ }
@ParameterizedTest
@EnumSource(Features.class)
@@ -36,13 +44,13 @@ public class FeaturesTest {
int numFeatures = featureImplementations.length;
short latestProductionLevel = feature.latestProduction();
- for (short i = 1; i < numFeatures; i++) {
+ for (short i = 0; i < numFeatures; i++) {
short level = i;
if (latestProductionLevel < i) {
- assertEquals(featureImplementations[i - 1],
feature.fromFeatureLevel(level, true));
+ assertEquals(featureImplementations[i],
feature.fromFeatureLevel(level, true));
assertThrows(IllegalArgumentException.class, () ->
feature.fromFeatureLevel(level, false));
} else {
- assertEquals(featureImplementations[i - 1],
feature.fromFeatureLevel(level, false));
+ assertEquals(featureImplementations[i],
feature.fromFeatureLevel(level, false));
}
}
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
new file mode 100644
index 00000000000..4c6d417bb8b
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class KRaftVersionTest {
+ @Test
+ public void testFeatureLevel() {
+ for (int i = 0; i < KRaftVersion.values().length; i++) {
+ assertEquals(i, KRaftVersion.values()[i].featureLevel());
+ }
+ }
+
+ @Test
+ public void testQuorumStateVersion() {
+ for (int i = 0; i < KRaftVersion.values().length; i++) {
+ assertEquals(i, KRaftVersion.values()[i].quorumStateVersion());
+ }
+ }
+
+ @Test
+ public void testFromFeatureLevel() {
+ for (int i = 0; i < KRaftVersion.values().length; i++) {
+ assertEquals(KRaftVersion.values()[i],
KRaftVersion.fromFeatureLevel((short) i));
+ }
+ }
+
+ @Test
+ public void testBootstrapMetadataVersion() {
+ for (int i = 0; i < KRaftVersion.values().length; i++) {
+ MetadataVersion metadataVersion =
KRaftVersion.values()[i].bootstrapMetadataVersion();
+ switch (i) {
+ case 0:
+ assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
metadataVersion);
+ break;
+ case 1:
+ assertEquals(MetadataVersion.IBP_3_9_IV0, metadataVersion);
+ break;
+ default:
+ throw new RuntimeException("Unsupported value " + i);
+ }
+ }
+ }
+}