This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 075401e2e1d KAFKA-19964; ApiVersions response return unknown features
if the cluster metadata HWM is unknown (#21122)
075401e2e1d is described below
commit 075401e2e1dd6ad5be85e75d81030542c3a840bb
Author: mannoopj <[email protected]>
AuthorDate: Mon Jun 1 06:02:44 2026 -0400
KAFKA-19964; ApiVersions response return unknown features if the cluster
metadata HWM is unknown (#21122)
FeaturesPublisher's default finalizedFeatures assumes a metadata version
of 7 when there is no finalized level for metadata.version. Instead of
defaulting to 7, it should not report a value and report -1 for the epoch
indicating it is unknown.
The FinalizedFeatures type now support 3 different kind of values:
1. FinalizedFeatures#unknown() returns an instance representing unknown
metadata version (no finalized features, epoch -1)
2. FinalizedFeatures#fromMetadataVersion(MetadataVersion) creates
instance with only the metadata version known
3. FinalizedFeatures#of(MetadataVersion, Map<String, Short>, long)
creates instance with full features information
With this implementation the ApiVersions response will return a finalized
epoch of -1 and no finalized versions when the finalized features are
unknown.
Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
<[email protected]>
---
.../main/scala/kafka/server/ControllerApis.scala | 5 +-
.../main/scala/kafka/server/ControllerServer.scala | 12 +-
.../server/metadata/BrokerMetadataPublisher.scala | 4 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 2 +-
.../TransactionCoordinatorConcurrencyTest.scala | 2 +-
.../transaction/TransactionStateManagerTest.scala | 4 +-
.../scala/unit/kafka/network/ProcessorTest.scala | 2 +-
.../unit/kafka/network/SocketServerTest.scala | 2 +-
.../unit/kafka/server/ControllerApisTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../apache/kafka/metadata/KRaftMetadataCache.java | 6 +-
.../metadata/publisher/FeaturesPublisher.java | 8 +-
.../kafka/server/common/FinalizedFeatures.java | 148 +++++++++++++++++++--
.../kafka/server/common/FinalizedFeaturesTest.java | 74 ++++++++++-
.../kafka/server/SimpleApiVersionManagerTest.java | 45 +++++++
16 files changed, 284 insertions(+), 38 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index a36672311fd..4ba6c4069a0 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -1060,7 +1060,10 @@ class ControllerApis(
def handleDescribeCluster(request: Request): CompletableFuture[Unit] = {
// Nearly all RPCs should check MetadataVersion inside the
QuorumController. However, this
// RPC is consulting a cache which lives outside the QC. So we check
MetadataVersion here.
- if
(!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported)
{
+ if (apiVersionManager.features.isUnknown) {
+ throw new UnsupportedVersionException("There is no finalized
MetadataVersion, so " +
+ "direct-to-controller communication is not supported.")
+ } else if
(!apiVersionManager.features.metadataVersionOrThrow.isControllerRegistrationSupported)
{
throw new UnsupportedVersionException("Direct-to-controller
communication is not " +
"supported with the current MetadataVersion.")
}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 902252233ad..e0e991cfbb5 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -162,9 +162,15 @@ class ControllerServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
config.unstableApiVersionsEnabled,
- () => featuresPublisher.features().setFinalizedLevel(
- KRaftVersion.FEATURE_NAME,
- raftManager.client.kraftVersion().featureLevel())
+ () => {
+ val features = featuresPublisher.features()
+ if (!features.isUnknown)
+ features.setFinalizedLevel(
+ KRaftVersion.FEATURE_NAME,
+ raftManager.client.kraftVersion().featureLevel())
+ else
+ features
+ }
)
// metrics will be set to null when closing a controller, so we should
recreate it for testing
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 770dacb868f..472ccdf0f37 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -104,7 +104,7 @@ class BrokerMetadataPublisher(
/**
* The share version being used in the broker metadata.
*/
- private var finalizedShareVersion: Short =
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
+ private var finalizedShareVersion: Short =
FinalizedFeatures.fromMetadataVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
override def name(): String = "BrokerMetadataPublisher"
@@ -225,7 +225,7 @@ class BrokerMetadataPublisher(
if (delta.featuresDelta != null) {
try {
- val newFinalizedFeatures = new
FinalizedFeatures(newImage.features.metadataVersionOrThrow,
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
+ val newFinalizedFeatures =
FinalizedFeatures.of(newImage.features.metadataVersionOrThrow,
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
val newFinalizedShareVersion =
newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
0.toShort)
// Share version feature has been toggled.
if (newFinalizedShareVersion != finalizedShareVersion) {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 4690fabf7e3..419cbe397d6 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -83,7 +83,7 @@ class TestRaftServer(
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
- () =>
FinalizedFeatures.fromKRaftVersion(MetadataVersion.MINIMUM_VERSION))
+ () =>
FinalizedFeatures.fromMetadataVersion(MetadataVersion.MINIMUM_VERSION))
socketServer = new SocketServer(config, metrics, time, credentialProvider,
apiVersionManager)
val endpoints = Endpoints.fromInetSocketAddresses(
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 26317d8535d..0be4317134a 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -86,7 +86,7 @@ class TransactionCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurren
any[ListenerName])
).thenReturn(Optional.of(brokerNode))
when(metadataCache.features()).thenReturn {
- new FinalizedFeatures(
+ FinalizedFeatures.of(
MetadataVersion.latestTesting(),
util.Map.of(TransactionVersion.FEATURE_NAME,
TransactionVersion.TV_2.featureLevel()),
0)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 84c58f40c87..ea7e44b4149 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -71,7 +71,7 @@ class TransactionStateManagerTest {
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
when(metadataCache.features()).thenReturn {
- new FinalizedFeatures(
+ FinalizedFeatures.of(
MetadataVersion.latestTesting(),
util.Map.of(TransactionVersion.FEATURE_NAME,
TransactionVersion.TV_2.featureLevel()),
0)
@@ -1356,7 +1356,7 @@ class TransactionStateManagerTest {
def testTransactionVersionInTransactionManager(transactionVersion:
TransactionVersion): Unit = {
val metadataCache = mock(classOf[MetadataCache])
when(metadataCache.features()).thenReturn {
- new FinalizedFeatures(
+ FinalizedFeatures.of(
MetadataVersion.latestTesting(),
util.Map.of(TransactionVersion.FEATURE_NAME,
transactionVersion.featureLevel()),
0)
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 980ca2cadb4..51d3de41250 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -42,7 +42,7 @@ class ProcessorTest {
val requestHeader = RequestTestUtils.serializeRequestHeader(
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
val apiVersionManager = new
SimpleApiVersionManager(ListenerType.CONTROLLER, true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
+ () => FinalizedFeatures.of(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
val e = assertThrows(classOf[InvalidRequestException],
(() => Processor.parseRequestHeader(apiVersionManager, requestHeader)):
Executable,
"INIT_PRODUCER_ID with listener type CONTROLLER should throw
InvalidRequestException exception")
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 978fc39715f..5dc29bf5576 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -86,7 +86,7 @@ class SocketServerTest {
ServerTestUtils.clearYammerMetrics()
private val apiVersionManager = new
SimpleApiVersionManager(ListenerType.BROKER, true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
+ () => FinalizedFeatures.of(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
var server: SocketServer = _
val sockets = new ArrayBuffer[Socket]
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7f30d8f2971..bc79fd11735 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -176,7 +176,7 @@ class ControllerApisTest {
new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
- () =>
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting())),
+ () =>
FinalizedFeatures.fromMetadataVersion(MetadataVersion.latestTesting())),
metadataCache
)
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6ab30fce2fd..5471c410293 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -190,7 +190,7 @@ class KafkaApisTest extends Logging {
val apiVersionManager = new SimpleApiVersionManager(
ListenerType.BROKER,
true,
- () => new FinalizedFeatures(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
+ () => FinalizedFeatures.of(MetadataVersion.latestTesting(),
util.Map.of[String, java.lang.Short], 0))
setupFeatures(featureVersions)
@@ -224,7 +224,7 @@ class KafkaApisTest extends Logging {
if (featureVersions.isEmpty) return
when(metadataCache.features()).thenReturn {
- new FinalizedFeatures(
+ FinalizedFeatures.of(
MetadataVersion.latestTesting,
featureVersions.map { featureVersion =>
featureVersion.featureName ->
featureVersion.featureLevel.asInstanceOf[java.lang.Short]
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index aba475e86cc..5e7de246783 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -209,7 +209,7 @@ public class KRaftMetadataRequestBenchmark {
setApiVersionManager(new SimpleApiVersionManager(
ApiMessageType.ListenerType.BROKER,
false,
- () ->
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
+ () ->
FinalizedFeatures.fromMetadataVersion(MetadataVersion.latestTesting()))).
setGroupConfigManager(groupConfigManager).
build();
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
index 283c062a9ff..f05e34e408d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
@@ -518,6 +518,10 @@ public class KRaftMetadataCache implements MetadataCache {
if (kraftVersionLevel > 0) {
finalizedFeatures.put(KRaftVersion.FEATURE_NAME,
kraftVersionLevel);
}
- return new
FinalizedFeatures(image.features().metadataVersionOrThrow(), finalizedFeatures,
image.highestOffsetAndEpoch().offset());
+ return FinalizedFeatures.of(
+ image.features().metadataVersionOrThrow(),
+ finalizedFeatures,
+ image.highestOffsetAndEpoch().offset()
+ );
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
index 678a855ceaa..f4cbb4faddc 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
@@ -27,13 +27,10 @@ import org.apache.kafka.server.fault.FaultHandler;
import org.slf4j.Logger;
-import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
-
-
public class FeaturesPublisher implements MetadataPublisher {
private final Logger log;
private final FaultHandler faultHandler;
- private volatile FinalizedFeatures finalizedFeatures =
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
+ private volatile FinalizedFeatures finalizedFeatures =
FinalizedFeatures.unknown();
public FeaturesPublisher(
LogContext logContext,
@@ -60,7 +57,8 @@ public class FeaturesPublisher implements MetadataPublisher {
) {
try {
if (delta.featuresDelta() != null) {
- FinalizedFeatures newFinalizedFeatures = new
FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
+ FinalizedFeatures newFinalizedFeatures = FinalizedFeatures.of(
+ newImage.features().metadataVersionOrThrow(),
newImage.features().finalizedVersions(),
newImage.provenance().lastContainedOffset()
);
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 9133f0db798..dd39984fbcd 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -19,28 +19,128 @@ package org.apache.kafka.server.common;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
-public record FinalizedFeatures(
- MetadataVersion metadataVersion,
- Map<String, Short> finalizedFeatures,
- long finalizedFeaturesEpoch
-) {
- public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
- return new FinalizedFeatures(version, Map.of(), -1);
- }
+/**
+ * Represents the finalized feature levels for a Kafka cluster.
+ * <p>
+ * This class can be in one of three states:
+ * <ul>
+ * <li>Unknown - the metadata version has not been committed yet, e.g.
before a quorum is
+ * formed. Used as the initial state in {@code FeaturesPublisher}. (use
{@link #unknown()})</li>
+ * <li>Metadata version only - the metadata version is known but no
additional features or
+ * epoch have been set. Used when only the metadata version needs to be
represented
+ * without a full set of finalized features. (use {@link
#fromMetadataVersion(MetadataVersion)})</li>
+ * <li>Full features - metadata version, features map, and epoch are all
known. Used after
+ * the controller has committed feature records. (use {@link
#of(MetadataVersion, Map, long)})</li>
+ * </ul>
+ */
+public final class FinalizedFeatures {
+ private static final FinalizedFeatures UNKNOWN = new
FinalizedFeatures(Optional.empty(), Map.of(), -1);
+
+ private final Optional<MetadataVersion> metadataVersion;
+ private final Map<String, Short> finalizedFeatures;
+ private final long finalizedFeaturesEpoch;
- public FinalizedFeatures(
- MetadataVersion metadataVersion,
+ private FinalizedFeatures(
+ Optional<MetadataVersion> metadataVersion,
Map<String, Short> finalizedFeatures,
long finalizedFeaturesEpoch
) {
- this.metadataVersion = Objects.requireNonNull(metadataVersion);
+ this.metadataVersion = metadataVersion;
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
- this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME,
metadataVersion.featureLevel());
+ metadataVersion.ifPresent(mv ->
+ this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME,
mv.featureLevel()));
+ }
+
+ /**
+ * Returns a sentinel value representing unknown finalized features.
+ *
+ * @return the unknown finalized features instance
+ */
+ public static FinalizedFeatures unknown() {
+ return UNKNOWN;
+ }
+
+ /**
+ * Creates a new instance from the given KRaft metadata version.
+ *
+ * @param version the metadata version
+ * @return a new FinalizedFeatures instance
+ * @throws NullPointerException if version is null
+ */
+ public static FinalizedFeatures fromMetadataVersion(MetadataVersion
version) {
+ Objects.requireNonNull(version, "version cannot be null");
+ return new FinalizedFeatures(Optional.of(version), Map.of(), -1);
+ }
+
+ /**
+ * Creates a new instance with the given metadata version, features map,
and epoch.
+ *
+ * @param metadataVersion the metadata version
+ * @param finalizedFeatures the map of feature names to their finalized
levels
+ * @param epoch the epoch of the finalized features
+ * @return a new FinalizedFeatures instance
+ * @throws NullPointerException if metadataVersion or finalizedFeatures is
null
+ */
+ public static FinalizedFeatures of(MetadataVersion metadataVersion,
Map<String, Short> finalizedFeatures, long epoch) {
+ Objects.requireNonNull(metadataVersion, "metadataVersion cannot be
null");
+ Objects.requireNonNull(finalizedFeatures, "finalizedFeatures cannot be
null");
+ return new FinalizedFeatures(Optional.of(metadataVersion),
finalizedFeatures, epoch);
+ }
+
+ /**
+ * Returns whether the finalized features are unknown.
+ *
+ * @return true if the finalized features are unknown, false otherwise
+ */
+ public boolean isUnknown() {
+ return this == UNKNOWN;
+ }
+
+ /**
+ * Returns the metadata version, throwing an exception if unknown.
+ *
+ * @return the metadata version
+ * @throws IllegalStateException if the metadata version is unknown
+ */
+ public MetadataVersion metadataVersionOrThrow() {
+ return metadataVersion.orElseThrow(() ->
+ new IllegalStateException("Metadata version is unknown"));
+ }
+
+ /**
+ * Returns the map of feature names to their finalized levels.
+ *
+ * @return the finalized features map
+ */
+ public Map<String, Short> finalizedFeatures() {
+ return finalizedFeatures;
}
+ /**
+ * Returns the epoch of the finalized features.
+ *
+ * @return the finalized features epoch
+ */
+ public long finalizedFeaturesEpoch() {
+ return finalizedFeaturesEpoch;
+ }
+
+ /**
+ * Creates a new instance with the specified feature level set or removed.
+ * If level is 0, the feature is removed. Otherwise, the feature is set to
the given level.
+ *
+ * @param key the feature name
+ * @param level the feature level (0 to remove)
+ * @return a new FinalizedFeatures instance with the updated feature level
+ * @throws IllegalStateException if this is the unknown instance
+ */
public FinalizedFeatures setFinalizedLevel(String key, short level) {
+ if (isUnknown()) {
+ throw new IllegalStateException("Cannot set finalized level on
unknown FinalizedFeatures");
+ }
if (level == (short) 0) {
if (finalizedFeatures.containsKey(key)) {
Map<String, Short> newFinalizedFeatures = new
HashMap<>(finalizedFeatures);
@@ -61,4 +161,28 @@ public record FinalizedFeatures(
finalizedFeaturesEpoch);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FinalizedFeatures that = (FinalizedFeatures) o;
+ return finalizedFeaturesEpoch == that.finalizedFeaturesEpoch &&
+ Objects.equals(metadataVersion, that.metadataVersion) &&
+ Objects.equals(finalizedFeatures, that.finalizedFeatures);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(metadataVersion, finalizedFeatures,
finalizedFeaturesEpoch);
+ }
+
+ @Override
+ public String toString() {
+ return "FinalizedFeatures(" +
+ "metadataVersion=" + metadataVersion +
+ ", finalizedFeatures=" + finalizedFeatures +
+ ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
+ ')';
+ }
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
index 1c4c9547cf0..c0dd9c58a83 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
@@ -24,12 +24,54 @@ import java.util.Map;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class FinalizedFeaturesTest {
+
+ @Test
+ public void testUnknownFeatures() {
+ FinalizedFeatures features = FinalizedFeatures.unknown();
+
+ assertTrue(features.isUnknown());
+ assertTrue(features.finalizedFeatures().isEmpty());
+ assertEquals(-1, features.finalizedFeaturesEpoch());
+ }
+
+ @Test
+ public void testUnknownFeaturesMetadataVersionThrows() {
+ FinalizedFeatures features = FinalizedFeatures.unknown();
+
+ assertThrows(IllegalStateException.class,
features::metadataVersionOrThrow);
+ }
+
+ @Test
+ public void testUnknownFeaturesIsSingleton() {
+ assertSame(FinalizedFeatures.unknown(), FinalizedFeatures.unknown());
+ }
+
+ @Test
+ public void testFromKRaftVersion() {
+ FinalizedFeatures features =
FinalizedFeatures.fromMetadataVersion(MINIMUM_VERSION);
+
+ assertFalse(features.isUnknown());
+ assertEquals(MINIMUM_VERSION, features.metadataVersionOrThrow());
+ assertEquals(MINIMUM_VERSION.featureLevel(),
features.finalizedFeatures().get(FEATURE_NAME));
+ assertEquals(1, features.finalizedFeatures().size());
+ assertEquals(-1, features.finalizedFeaturesEpoch());
+ }
+
+ @Test
+ public void testFromKRaftVersionNullThrows() {
+ assertThrows(NullPointerException.class, () ->
FinalizedFeatures.fromMetadataVersion(null));
+ }
+
@Test
public void testKRaftModeFeatures() {
- FinalizedFeatures finalizedFeatures = new
FinalizedFeatures(MINIMUM_VERSION,
+ FinalizedFeatures finalizedFeatures =
FinalizedFeatures.of(MINIMUM_VERSION,
Map.of("foo", (short) 2), 123);
assertEquals(MINIMUM_VERSION.featureLevel(),
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
@@ -38,20 +80,44 @@ class FinalizedFeaturesTest {
assertEquals(2, finalizedFeatures.finalizedFeatures().size());
}
+ @Test
+ public void testOfNullMetadataVersionThrows() {
+ assertThrows(NullPointerException.class,
+ () -> FinalizedFeatures.of(null, Map.of(), 0));
+ }
+
+ @Test
+ public void testOfNullFeaturesMapThrows() {
+ assertThrows(NullPointerException.class,
+ () -> FinalizedFeatures.of(MINIMUM_VERSION, null, 0));
+ }
+
@Test
public void testSetFinalizedLevel() {
- FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
+ FinalizedFeatures finalizedFeatures = FinalizedFeatures.of(
MINIMUM_VERSION,
Map.of("foo", (short) 2),
123
);
- // Override an existing finalized feature version to 0
FinalizedFeatures removedFeatures =
finalizedFeatures.setFinalizedLevel("foo", (short) 0);
assertNull(removedFeatures.finalizedFeatures().get("foo"));
- // Override a missing finalized feature version to 0
FinalizedFeatures sameFeatures =
removedFeatures.setFinalizedLevel("foo", (short) 0);
assertEquals(sameFeatures.finalizedFeatures(),
removedFeatures.finalizedFeatures());
}
+
+ @Test
+ public void testSetFinalizedLevelAdd() {
+ FinalizedFeatures features = FinalizedFeatures.of(MINIMUM_VERSION,
Map.of(), 123);
+
+ FinalizedFeatures updatedFeatures = features.setFinalizedLevel("bar",
(short) 5);
+ assertEquals((short) 5,
updatedFeatures.finalizedFeatures().get("bar"));
+ }
+
+ @Test
+ public void testSetFinalizedLevelOnUnknownThrows() {
+ assertThrows(IllegalStateException.class,
+ () -> FinalizedFeatures.unknown().setFinalizedLevel("foo", (short)
1));
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java
b/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java
new file mode 100644
index 00000000000..7da6bd612fc
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SimpleApiVersionManagerTest {
+
+ @Test
+ public void testUnknownFeaturesHasNoMetadataVersion() {
+ SimpleApiVersionManager apiVersionManager = new
SimpleApiVersionManager(
+ ApiMessageType.ListenerType.CONTROLLER,
+ true,
+ FinalizedFeatures::unknown
+ );
+ ApiVersionsResponse response = apiVersionManager.apiVersionResponse(0,
false);
+
+ assertTrue(response.data().finalizedFeatures().isEmpty(),
+ "Finalized features should be empty when no quorum exists");
+
+ assertEquals(-1, response.data().finalizedFeaturesEpoch(),
+ "Finalized features epoch should be -1 when no quorum exists");
+ }
+}