This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa2df3bca75 KAFKA-18559 Cleanup FinalizedFeatures (#18593)
fa2df3bca75 is described below

commit fa2df3bca75fdd72a94890bbef34aaa8a48e76c7
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Jan 24 19:39:01 2025 +0800

    KAFKA-18559 Cleanup FinalizedFeatures (#18593)
    
    Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../scala/kafka/server/metadata/KRaftMetadataCache.scala  |  3 +--
 .../TransactionCoordinatorConcurrencyTest.scala           |  4 +---
 .../transaction/TransactionStateManagerTest.scala         |  8 ++------
 .../src/test/scala/unit/kafka/network/ProcessorTest.scala |  6 +++---
 .../test/scala/unit/kafka/network/SocketServerTest.scala  |  2 +-
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala |  6 ++----
 .../kafka/metadata/publisher/FeaturesPublisher.java       |  4 ++--
 .../org/apache/kafka/server/common/FinalizedFeatures.java | 15 ++++-----------
 .../apache/kafka/server/common/FinalizedFeaturesTest.java | 13 +------------
 9 files changed, 17 insertions(+), 44 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 51c05d68c32..be13635c1ab 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -540,8 +540,7 @@ class KRaftMetadataCache(
     }
     new FinalizedFeatures(image.features().metadataVersion(),
       finalizedFeatures,
-      image.highestOffsetAndEpoch().offset,
-      true)
+      image.highestOffsetAndEpoch().offset)
   }
 }
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 24000894fe9..621730bc65e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -85,9 +85,7 @@ class TransactionCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurren
       new FinalizedFeatures(
         MetadataVersion.latestTesting(),
         Collections.singletonMap(TransactionVersion.FEATURE_NAME, 
TransactionVersion.TV_2.featureLevel()),
-        0,
-        true
-      )
+        0)
     }
 
     when(metadataCache.metadataVersion())
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 41e6b1a954a..522461e5485 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -70,9 +70,7 @@ class TransactionStateManagerTest {
     new FinalizedFeatures(
       MetadataVersion.latestTesting(),
       Collections.singletonMap(TransactionVersion.FEATURE_NAME, 
TransactionVersion.TV_2.featureLevel()),
-      0,
-      true
-    )
+      0)
   }
   
   val metrics = new Metrics()
@@ -1332,9 +1330,7 @@ class TransactionStateManagerTest {
       new FinalizedFeatures(
         MetadataVersion.latestTesting(),
         Collections.singletonMap(TransactionVersion.FEATURE_NAME, 
transactionVersion.featureLevel()),
-        0,
-        true
-      )
+        0)
     }
     val transactionManager = new TransactionStateManager(0, scheduler,
       replicaManager, metadataCache, txnConfig, time, metrics)
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 94e93d4d2a8..3a862678ca7 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -37,7 +37,7 @@ class ProcessorTest {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
       new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
     val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.CONTROLLER, true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "INIT_PRODUCER_ID with listener type CONTROLLER should throw 
InvalidRequestException exception")
@@ -55,7 +55,7 @@ class ProcessorTest {
       .setCorrelationId(0);
     val requestHeader = RequestTestUtils.serializeRequestHeader(new 
RequestHeader(requestHeaderData, headerVersion))
     val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, 
true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "LEADER_AND_ISR should throw InvalidRequestException exception")
@@ -67,7 +67,7 @@ class ProcessorTest {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
       new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
     val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, 
true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
     val e = assertThrows(classOf[UnsupportedVersionException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "PRODUCE v0 should throw UnsupportedVersionException exception")
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 5ebcfd65cce..5b2196018e5 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -84,7 +84,7 @@ class SocketServerTest {
   TestUtils.clearYammerMetrics()
 
   private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.BROKER, true,
-    () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
+    () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
   var server: SocketServer = _
   val sockets = new ArrayBuffer[Socket]
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index af26996c56e..901309b289d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -179,7 +179,7 @@ class KafkaApisTest extends Logging {
       enabledApis,
       BrokerFeatures.defaultSupportedFeatures(true),
       true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0, true))
+      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
 
     
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
     setupFeatures(featureVersions)
@@ -220,9 +220,7 @@ class KafkaApisTest extends Logging {
             featureVersions.map { featureVersion =>
               featureVersion.featureName -> 
featureVersion.featureLevel.asInstanceOf[java.lang.Short]
             }.toMap.asJava,
-            0,
-            true
-          )
+            0)
         }
 
       case _ => throw new IllegalStateException("Test must set an instance of 
KRaftMetadataCache")
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
index a03f08291b5..01572dd9411 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
@@ -57,8 +57,8 @@ public class FeaturesPublisher implements MetadataPublisher {
         if (delta.featuresDelta() != null) {
             FinalizedFeatures newFinalizedFeatures = new 
FinalizedFeatures(newImage.features().metadataVersion(),
                     newImage.features().finalizedVersions(),
-                    newImage.provenance().lastContainedOffset(),
-                    true);
+                    newImage.provenance().lastContainedOffset()
+            );
             if (!newFinalizedFeatures.equals(finalizedFeatures)) {
                 log.info("Loaded new metadata {}.", newFinalizedFeatures);
                 finalizedFeatures = newFinalizedFeatures;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index de78a3a72a8..1eb39466409 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -27,25 +27,18 @@ public final class FinalizedFeatures {
     private final long finalizedFeaturesEpoch;
 
     public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
-        return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+        return new FinalizedFeatures(version, Collections.emptyMap(), -1);
     }
 
     public FinalizedFeatures(
         MetadataVersion metadataVersion,
         Map<String, Short> finalizedFeatures,
-        long finalizedFeaturesEpoch,
-        boolean kraftMode
+        long finalizedFeaturesEpoch
     ) {
-        this.metadataVersion = metadataVersion;
+        this.metadataVersion = Objects.requireNonNull(metadataVersion);
         this.finalizedFeatures = new HashMap<>(finalizedFeatures);
         this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-        // In KRaft mode, we always include the metadata version in the 
features map.
-        // In ZK mode, we never include it.
-        if (kraftMode) {
-            this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
-        } else {
-            this.finalizedFeatures.remove(MetadataVersion.FEATURE_NAME);
-        }
+        this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
     }
 
     public MetadataVersion metadataVersion() {
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
index ae6ca998df2..31d57bedfe5 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
@@ -24,27 +24,16 @@ import java.util.Collections;
 import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
 import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
 
 class FinalizedFeaturesTest {
     @Test
     public void testKRaftModeFeatures() {
         FinalizedFeatures finalizedFeatures = new 
FinalizedFeatures(MINIMUM_KRAFT_VERSION,
-                Collections.singletonMap("foo", (short) 2), 123, true);
+                Collections.singletonMap("foo", (short) 2), 123);
         assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
                 finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
         assertEquals((short) 2,
                 finalizedFeatures.finalizedFeatures().get("foo"));
         assertEquals(2, finalizedFeatures.finalizedFeatures().size());
     }
-
-    @Test
-    public void testZkModeFeatures() {
-        FinalizedFeatures finalizedFeatures = new 
FinalizedFeatures(MINIMUM_KRAFT_VERSION,
-                Collections.singletonMap("foo", (short) 2), 123, false);
-        assertNull(finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
-        assertEquals((short) 2,
-                finalizedFeatures.finalizedFeatures().get("foo"));
-        assertEquals(1, finalizedFeatures.finalizedFeatures().size());
-    }
 }

Reply via email to