This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 0297ba2c67e KAFKA-19192; Old bootstrap checkpoint files cause problems 
updated servers (#19545)
0297ba2c67e is described below

commit 0297ba2c67e42b11cfaa932d9d3f201e63c3a2a1
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Thu Apr 24 12:43:35 2025 -0700

    KAFKA-19192; Old bootstrap checkpoint files cause problems updated servers 
(#19545)
    
    Old bootstrap.metadata files cause problems with server that include
    KAFKA-18601. When the server tries to read the bootstrap.checkpoint
    file, it will fail if the metadata.version is older than 3.3-IV3
    (feature level 7). This causes problems when these clusters are
    upgraded.
    
    This PR makes it possible to represent older MVs in BootstrapMetadata
    objects without causing an exception. An exception is thrown only if we
    attempt to access the BootstrapMetadata. This ensures that only the code
    path in which we start with an empty metadata log checks that the
    metadata version is 7 or newer.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>, Ismael Juma
     <ism...@juma.me.uk>, PoAn Yang <pay...@apache.org>, Liu Zeyu
     <zeyu.l...@gmail.com>, Alyssa Huang <ahu...@confluent.io>
---
 .../kafka/server/KRaftClusterTest.scala            | 46 ++++++++++++++++++++++
 .../metadata/bootstrap/BootstrapMetadata.java      | 37 +++++++++--------
 .../metadata/bootstrap/BootstrapMetadataTest.java  | 12 +++---
 .../kafka/common/test/KafkaClusterTestKit.java     | 23 ++++++++---
 4 files changed, 88 insertions(+), 30 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 17a75080ba1..4fe4fb48cd8 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -65,6 +65,7 @@ import java.util.{Collections, Optional, OptionalLong, 
Properties}
 import scala.collection.{Seq, mutable}
 import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
+import scala.util.Using
 
 @Timeout(120)
 @Tag("integration")
@@ -1621,6 +1622,51 @@ class KRaftClusterTest {
     }
   }
 
+  /**
+   * Test that once a cluster is formatted, a bootstrap.metadata file that 
contains an unsupported
+   * MetadataVersion is not a problem. This is a regression test for 
KAFKA-19192.
+   */
+  @Test
+  def testOldBootstrapMetadataFile(): Unit = {
+    val baseDirectory = TestUtils.tempDir().toPath()
+    Using.resource(new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).
+        setBaseDirectory(baseDirectory).
+          build()).
+      setDeleteOnClose(false).
+        build()
+    ) { cluster =>
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+    }
+    val oldBootstrapMetadata = BootstrapMetadata.fromRecords(
+      util.Arrays.asList(
+        new ApiMessageAndVersion(
+          new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(1),
+          0.toShort)
+      ),
+      "oldBootstrapMetadata")
+    // Re-create the cluster using the same directory structure as above.
+    // Since we do not need to use the bootstrap metadata, the fact that
+    // it specifies an obsolete metadata.version should not be a problem.
+    Using.resource(new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).
+        setBaseDirectory(baseDirectory).
+        setBootstrapMetadata(oldBootstrapMetadata).
+          build()).build()
+    ) { cluster =>
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+    }
+  }
+
   @Test
   def testIncreaseNumIoThreads(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 2f6d8a08c0d..375c60f64d4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -37,7 +37,7 @@ import java.util.Optional;
  */
 public class BootstrapMetadata {
     private final List<ApiMessageAndVersion> records;
-    private final MetadataVersion metadataVersion;
+    private final short metadataVersionLevel;
     private final String source;
 
     public static BootstrapMetadata fromVersions(
@@ -67,7 +67,7 @@ public class BootstrapMetadata {
                     setFeatureLevel(level), (short) 0));
             }
         }
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersion.featureLevel(), 
source);
     }
 
     public static BootstrapMetadata fromVersion(MetadataVersion 
metadataVersion, String source) {
@@ -75,29 +75,28 @@ public class BootstrapMetadata {
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(MetadataVersion.FEATURE_NAME).
                 setFeatureLevel(metadataVersion.featureLevel()), (short) 0));
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersion.featureLevel(), 
source);
     }
 
     public static BootstrapMetadata fromRecords(List<ApiMessageAndVersion> 
records, String source) {
-        MetadataVersion metadataVersion = null;
+        Optional<Short> metadataVersionLevel = Optional.empty();
         for (ApiMessageAndVersion record : records) {
-            Optional<MetadataVersion> version = 
recordToMetadataVersion(record.message());
-            if (version.isPresent()) {
-                metadataVersion = version.get();
+            Optional<Short> level = 
recordToMetadataVersionLevel(record.message());
+            if (level.isPresent()) {
+                metadataVersionLevel = level;
             }
         }
-        if (metadataVersion == null) {
+        if (metadataVersionLevel.isEmpty()) {
             throw new RuntimeException("No FeatureLevelRecord for " + 
MetadataVersion.FEATURE_NAME +
                     " was found in the bootstrap metadata from " + source);
         }
-        return new BootstrapMetadata(records, metadataVersion, source);
+        return new BootstrapMetadata(records, metadataVersionLevel.get(), 
source);
     }
 
-    public static Optional<MetadataVersion> recordToMetadataVersion(ApiMessage 
record) {
-        if (record instanceof FeatureLevelRecord) {
-            FeatureLevelRecord featureLevel = (FeatureLevelRecord) record;
+    public static Optional<Short> recordToMetadataVersionLevel(ApiMessage 
record) {
+        if (record instanceof FeatureLevelRecord featureLevel) {
             if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
-                return 
Optional.of(MetadataVersion.fromFeatureLevel(featureLevel.featureLevel()));
+                return Optional.of(featureLevel.featureLevel());
             }
         }
         return Optional.empty();
@@ -105,11 +104,11 @@ public class BootstrapMetadata {
 
     BootstrapMetadata(
         List<ApiMessageAndVersion> records,
-        MetadataVersion metadataVersion,
+        short metadataVersionLevel,
         String source
     ) {
         this.records = Objects.requireNonNull(records);
-        this.metadataVersion = metadataVersion;
+        this.metadataVersionLevel = metadataVersionLevel;
         Objects.requireNonNull(source);
         this.source = source;
     }
@@ -119,7 +118,7 @@ public class BootstrapMetadata {
     }
 
     public MetadataVersion metadataVersion() {
-        return metadataVersion;
+        return MetadataVersion.fromFeatureLevel(metadataVersionLevel);
     }
 
     public String source() {
@@ -167,7 +166,7 @@ public class BootstrapMetadata {
 
     @Override
     public int hashCode() {
-        return Objects.hash(records, metadataVersion, source);
+        return Objects.hash(records, metadataVersionLevel, source);
     }
 
     @Override
@@ -175,14 +174,14 @@ public class BootstrapMetadata {
         if (o == null || !o.getClass().equals(this.getClass())) return false;
         BootstrapMetadata other = (BootstrapMetadata) o;
         return Objects.equals(records, other.records) &&
-            metadataVersion.equals(other.metadataVersion) &&
+            metadataVersionLevel == other.metadataVersionLevel &&
             source.equals(other.source);
     }
 
     @Override
     public String toString() {
         return "BootstrapMetadata(records=" + records.toString() +
-            ", metadataVersion=" + metadataVersion +
+            ", metadataVersionLevel=" + metadataVersionLevel +
             ", source=" + source +
             ")";
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 549032989bd..20688b502cf 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -56,13 +56,13 @@ public class BootstrapMetadataTest {
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(FEATURE_NAME).
                 setFeatureLevel((short) 7), (short) 0)),
-                    IBP_3_3_IV3, "foo"),
+                    IBP_3_3_IV3.featureLevel(), "foo"),
             BootstrapMetadata.fromVersion(IBP_3_3_IV3, "foo"));
     }
 
     @Test
     public void testFromRecordsList() {
-        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV3, 
"bar"),
+        assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, 
IBP_3_3_IV3.featureLevel(), "bar"),
             BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
     }
 
@@ -133,10 +133,10 @@ public class BootstrapMetadataTest {
 
     @Test
     public void testFromRecordsListWithOldMetadataVersion() {
-        RuntimeException exception = assertThrows(RuntimeException.class,
-            () -> 
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
+        BootstrapMetadata bootstrapMetadata = 
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux");
         assertEquals("No MetadataVersion with feature level 1. Valid feature 
levels are from " + MetadataVersion.MINIMUM_VERSION.featureLevel()
-                + " to " + MetadataVersion.latestTesting().featureLevel() + 
".",
-            exception.getMessage());
+            + " to " + MetadataVersion.latestTesting().featureLevel() + ".",
+                assertThrows(RuntimeException.class,
+                    () -> bootstrapMetadata.metadataVersion()).getMessage());
     }
 }
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 097f8c3e26d..20963665440 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -114,6 +114,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         private final String controllerListenerName;
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
+        private boolean deleteOnClose;
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
@@ -121,6 +122,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             this.controllerListenerName = 
nodes.controllerListenerName().value();
             this.brokerSecurityProtocol = nodes.brokerListenerProtocol().name;
             this.controllerSecurityProtocol = 
nodes.controllerListenerProtocol().name;
+            this.deleteOnClose = true;
         }
 
         public Builder setConfigProp(String key, Object value) {
@@ -206,6 +208,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             }
         }
 
+        public Builder setDeleteOnClose(boolean deleteOnClose) {
+            this.deleteOnClose = deleteOnClose;
+            return this;
+        }
+
         public KafkaClusterTestKit build() throws Exception {
             Map<Integer, ControllerServer> controllers = new HashMap<>();
             Map<Integer, BrokerServer> brokers = new HashMap<>();
@@ -308,7 +315,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     baseDirectory,
                     faultHandlerFactory,
                     socketFactoryManager,
-                    jaasFile == null ? Optional.empty() : 
Optional.of(jaasFile));
+                    Optional.ofNullable(jaasFile),
+                    deleteOnClose);
         }
 
         private String listeners(int node) {
@@ -353,6 +361,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final PreboundSocketFactoryManager socketFactoryManager;
     private final String controllerListenerName;
     private final Optional<File> jaasFile;
+    private final boolean deleteOnClose;
 
     private KafkaClusterTestKit(
         TestKitNodes nodes,
@@ -361,7 +370,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         File baseDirectory,
         SimpleFaultHandlerFactory faultHandlerFactory,
         PreboundSocketFactoryManager socketFactoryManager,
-        Optional<File> jaasFile
+        Optional<File> jaasFile,
+        boolean deleteOnClose
     ) {
         /*
           Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
@@ -378,6 +388,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.socketFactoryManager = socketFactoryManager;
         this.controllerListenerName = nodes.controllerListenerName().value();
         this.jaasFile = jaasFile;
+        this.deleteOnClose = deleteOnClose;
     }
 
     public void format() throws Exception {
@@ -640,9 +651,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             }
             waitForAllFutures(futureEntries);
             futureEntries.clear();
-            Utils.delete(baseDirectory);
-            if (jaasFile.isPresent()) {
-                Utils.delete(jaasFile.get());
+            if (deleteOnClose) {
+                Utils.delete(baseDirectory);
+                if (jaasFile.isPresent()) {
+                    Utils.delete(jaasFile.get());
+                }
             }
         } catch (Exception e) {
             for (Entry<String, Future<?>> entry : futureEntries) {

Reply via email to