This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4cec158b6f MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java 
(#22116)
a4cec158b6f is described below

commit a4cec158b6ff69f3304625815ea59b7746dc7f63
Author: JiayĆ”o Sun <[email protected]>
AuthorDate: Fri May 15 00:01:50 2026 +1200

    MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java (#22116)
    
    Refactor RaftClusterSnapshotTest to use @ClusterTest and Java
    RaftManager
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-server.xml               |   2 +
 .../kafka/server/RaftClusterSnapshotTest.scala     | 105 -----------------
 .../kafka/server/RaftClusterSnapshotTest.java      | 126 +++++++++++++++++++++
 3 files changed, 128 insertions(+), 105 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 1f020ebe717..e1d33395b9b 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -84,6 +84,7 @@
 
   <subpackage name="server">
     <allow class="kafka.server.KafkaBroker" />
+    <allow class="kafka.server.BrokerServer" />
     <allow pkg="javax.crypto" />
     <allow class="org.apache.kafka.controller.ReplicationControlManager" />
     <allow pkg="org.apache.kafka.server" />
@@ -91,6 +92,7 @@
     <allow pkg="org.apache.kafka.network" />
     <allow pkg="org.apache.kafka.storage.internals.log" />
     <allow pkg="org.apache.kafka.storage.internals.checkpoint" />
+    <allow pkg="org.apache.kafka.snapshot" />
     <allow pkg="org.apache.logging.log4j" />
     <subpackage name="metrics">
       <allow 
class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
diff --git 
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
deleted file mode 100644
index 155ecf44207..00000000000
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.common.utils.internals.LogContext
-import org.apache.kafka.common.utils.internals.BufferSupplier
-import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.raft.MetadataLogConfig
-import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertNotEquals
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Timeout
-
-import scala.jdk.CollectionConverters._
-import scala.util.Using
-
-@Timeout(120)
-class RaftClusterSnapshotTest {
-
-  @Test
-  def testSnapshotsGenerated(): Unit = {
-    val numberOfBrokers = 3
-    val numberOfControllers = 3
-
-    Using.resource(
-      new KafkaClusterTestKit
-        .Builder(
-          new TestKitNodes.Builder()
-            .setNumBrokerNodes(numberOfBrokers)
-            .setNumControllerNodes(numberOfControllers)
-            .build()
-        )
-        
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, 
"10")
-        .setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, 
"0")
-        .build()
-    ) { cluster =>
-      cluster.format()
-      cluster.startup()
-
-      // Check that every controller and broker has a snapshot
-      TestUtils.waitUntilTrue(
-        () => {
-          cluster.raftManagers().asScala.forall { case (_, raftManager) =>
-            raftManager.raftLog.latestSnapshotId.isPresent
-          }
-        },
-        s"Expected for every controller and broker to generate a snapshot: ${
-          cluster.raftManagers().asScala.map { case (id, raftManager) =>
-            (id, raftManager.raftLog.latestSnapshotId)
-          }
-        }"
-      )
-
-      assertEquals(numberOfControllers + numberOfBrokers, 
cluster.raftManagers.size())
-
-      // For every controller and broker perform some sanity checks against 
the latest snapshot
-      for ((_, raftManager) <- cluster.raftManagers().asScala) {
-        Using.resource(
-          RecordsSnapshotReader.of(
-            raftManager.raftLog.latestSnapshot.get(),
-            new MetadataRecordSerde(),
-            BufferSupplier.create(),
-            1,
-            true,
-            new LogContext()
-          )
-        ) { snapshot =>
-          // Check that the snapshot is non-empty
-          assertTrue(snapshot.hasNext)
-
-          // Check that we can read the entire snapshot
-          while (snapshot.hasNext) {
-            val batch = snapshot.next
-            assertTrue(batch.sizeInBytes > 0)
-            // A batch must have at least one control records or at least one 
data records, but not both
-            assertNotEquals(
-              batch.records.isEmpty,
-              batch.controlRecords.isEmpty,
-              s"data records = ${batch.records}; control records = 
${batch.controlRecords}"
-            )
-          }
-        }
-      }
-    }
-  }
-}
diff --git 
a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java 
b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
new file mode 100644
index 00000000000..616405103e2
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import kafka.server.BrokerServer;
+
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.internals.BufferSupplier;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.raft.RaftManager;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.raft.MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.raft.MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(120)
+@ClusterTestDefaults(
+        types = Type.KRAFT,
+        brokers = RaftClusterSnapshotTest.BROKER_COUNT,
+        controllers = RaftClusterSnapshotTest.CONTROLLER_COUNT,
+        serverProperties = {
+            @ClusterConfigProperty(key = 
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, value = "10"),
+            @ClusterConfigProperty(key = METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, 
value = "0"),
+        })
+public class RaftClusterSnapshotTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final int CONTROLLER_COUNT = 3;
+
+    private final ClusterInstance clusterInstance;
+
+    public RaftClusterSnapshotTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    private Map<Integer, RaftManager<ApiMessageAndVersion>> raftManagers() {
+        Map<Integer, RaftManager<ApiMessageAndVersion>> results = new 
HashMap<>();
+        clusterInstance.brokers().values().forEach(broker -> {
+            if (broker instanceof BrokerServer brokerServer) {
+                results.put(brokerServer.config().brokerId(), 
brokerServer.sharedServer().raftManager());
+            }
+        });
+
+        clusterInstance.controllers().values().forEach(controller ->
+                results.putIfAbsent(controller.config().nodeId(), 
controller.sharedServer().raftManager())
+        );
+        return results;
+    }
+
+    @ClusterTest
+    public void testSnapshotsGenerated() throws Exception {
+
+        var raftManagers = raftManagers();
+        // Check that every controller and broker has a snapshot
+        TestUtils.waitForCondition(
+                () -> raftManagers.values().stream()
+                        .allMatch(raftManager -> 
raftManager.raftLog().latestSnapshotId().isPresent()),
+                () -> "Expected for every controller and broker to generate a 
snapshot: " +
+                        raftManagers.entrySet().stream()
+                                .collect(Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        e -> 
e.getValue().raftLog().latestSnapshotId()
+                                ))
+        );
+
+        assertEquals(BROKER_COUNT + CONTROLLER_COUNT, raftManagers.size());
+
+        // For every controller and broker perform some sanity checks against 
the latest snapshot
+        for (var raftManager : raftManagers.values()) {
+            try (var snapshot = RecordsSnapshotReader.of(
+                    raftManager.raftLog().latestSnapshot().get(),
+                    new MetadataRecordSerde(),
+                    BufferSupplier.create(),
+                    1,
+                    true,
+                    new LogContext()
+            )) {
+                // Check that the snapshot is non-empty
+                assertTrue(snapshot.hasNext());
+
+                // Check that we can read the entire snapshot
+                while (snapshot.hasNext()) {
+                    var batch = snapshot.next();
+                    assertTrue(batch.sizeInBytes() > 0);
+                    // A batch must have at least one control records or at 
least one data records, but not both
+                    assertNotEquals(
+                            batch.records().isEmpty(),
+                            batch.controlRecords().isEmpty(),
+                            "data records = " + batch.records() + "; control 
records = " + batch.controlRecords()
+                    );
+                }
+            }
+        }
+    }
+}

Reply via email to