This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4cec158b6f MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java
(#22116)
a4cec158b6f is described below
commit a4cec158b6ff69f3304625815ea59b7746dc7f63
Author: JiayƔo Sun <[email protected]>
AuthorDate: Fri May 15 00:01:50 2026 +1200
MINOR: Rewrite RaftClusterSnapshotTest from Scala to Java (#22116)
Refactor RaftClusterSnapshotTest to use @ClusterTest and Java
RaftManager
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-server.xml | 2 +
.../kafka/server/RaftClusterSnapshotTest.scala | 105 -----------------
.../kafka/server/RaftClusterSnapshotTest.java | 126 +++++++++++++++++++++
3 files changed, 128 insertions(+), 105 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 1f020ebe717..e1d33395b9b 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -84,6 +84,7 @@
<subpackage name="server">
<allow class="kafka.server.KafkaBroker" />
+ <allow class="kafka.server.BrokerServer" />
<allow pkg="javax.crypto" />
<allow class="org.apache.kafka.controller.ReplicationControlManager" />
<allow pkg="org.apache.kafka.server" />
@@ -91,6 +92,7 @@
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" />
<allow pkg="org.apache.kafka.storage.internals.checkpoint" />
+ <allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.logging.log4j" />
<subpackage name="metrics">
<allow
class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
diff --git
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
deleted file mode 100644
index 155ecf44207..00000000000
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.utils.TestUtils
-import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
-import org.apache.kafka.common.utils.internals.LogContext
-import org.apache.kafka.common.utils.internals.BufferSupplier
-import org.apache.kafka.metadata.MetadataRecordSerde
-import org.apache.kafka.raft.MetadataLogConfig
-import org.apache.kafka.snapshot.RecordsSnapshotReader
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertNotEquals
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Timeout
-
-import scala.jdk.CollectionConverters._
-import scala.util.Using
-
-@Timeout(120)
-class RaftClusterSnapshotTest {
-
- @Test
- def testSnapshotsGenerated(): Unit = {
- val numberOfBrokers = 3
- val numberOfControllers = 3
-
- Using.resource(
- new KafkaClusterTestKit
- .Builder(
- new TestKitNodes.Builder()
- .setNumBrokerNodes(numberOfBrokers)
- .setNumControllerNodes(numberOfControllers)
- .build()
- )
-
.setConfigProp(MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG,
"10")
- .setConfigProp(MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG,
"0")
- .build()
- ) { cluster =>
- cluster.format()
- cluster.startup()
-
- // Check that every controller and broker has a snapshot
- TestUtils.waitUntilTrue(
- () => {
- cluster.raftManagers().asScala.forall { case (_, raftManager) =>
- raftManager.raftLog.latestSnapshotId.isPresent
- }
- },
- s"Expected for every controller and broker to generate a snapshot: ${
- cluster.raftManagers().asScala.map { case (id, raftManager) =>
- (id, raftManager.raftLog.latestSnapshotId)
- }
- }"
- )
-
- assertEquals(numberOfControllers + numberOfBrokers,
cluster.raftManagers.size())
-
- // For every controller and broker perform some sanity checks against
the latest snapshot
- for ((_, raftManager) <- cluster.raftManagers().asScala) {
- Using.resource(
- RecordsSnapshotReader.of(
- raftManager.raftLog.latestSnapshot.get(),
- new MetadataRecordSerde(),
- BufferSupplier.create(),
- 1,
- true,
- new LogContext()
- )
- ) { snapshot =>
- // Check that the snapshot is non-empty
- assertTrue(snapshot.hasNext)
-
- // Check that we can read the entire snapshot
- while (snapshot.hasNext) {
- val batch = snapshot.next
- assertTrue(batch.sizeInBytes > 0)
- // A batch must have at least one control records or at least one
data records, but not both
- assertNotEquals(
- batch.records.isEmpty,
- batch.controlRecords.isEmpty,
- s"data records = ${batch.records}; control records =
${batch.controlRecords}"
- )
- }
- }
- }
- }
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
new file mode 100644
index 00000000000..616405103e2
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/RaftClusterSnapshotTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+import kafka.server.BrokerServer;
+
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.utils.internals.BufferSupplier;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.metadata.MetadataRecordSerde;
+import org.apache.kafka.raft.RaftManager;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.RecordsSnapshotReader;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.raft.MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.raft.MetadataLogConfig.METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(120)
+@ClusterTestDefaults(
+ types = Type.KRAFT,
+ brokers = RaftClusterSnapshotTest.BROKER_COUNT,
+ controllers = RaftClusterSnapshotTest.CONTROLLER_COUNT,
+ serverProperties = {
+ @ClusterConfigProperty(key =
METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, value = "10"),
+ @ClusterConfigProperty(key = METADATA_MAX_IDLE_INTERVAL_MS_CONFIG,
value = "0"),
+ })
+public class RaftClusterSnapshotTest {
+
+ public static final int BROKER_COUNT = 3;
+ public static final int CONTROLLER_COUNT = 3;
+
+ private final ClusterInstance clusterInstance;
+
+ public RaftClusterSnapshotTest(ClusterInstance clusterInstance) {
+ this.clusterInstance = clusterInstance;
+ }
+
+ private Map<Integer, RaftManager<ApiMessageAndVersion>> raftManagers() {
+ Map<Integer, RaftManager<ApiMessageAndVersion>> results = new
HashMap<>();
+ clusterInstance.brokers().values().forEach(broker -> {
+ if (broker instanceof BrokerServer brokerServer) {
+ results.put(brokerServer.config().brokerId(),
brokerServer.sharedServer().raftManager());
+ }
+ });
+
+ clusterInstance.controllers().values().forEach(controller ->
+ results.putIfAbsent(controller.config().nodeId(),
controller.sharedServer().raftManager())
+ );
+ return results;
+ }
+
+ @ClusterTest
+ public void testSnapshotsGenerated() throws Exception {
+
+ var raftManagers = raftManagers();
+ // Check that every controller and broker has a snapshot
+ TestUtils.waitForCondition(
+ () -> raftManagers.values().stream()
+ .allMatch(raftManager ->
raftManager.raftLog().latestSnapshotId().isPresent()),
+ () -> "Expected for every controller and broker to generate a
snapshot: " +
+ raftManagers.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e ->
e.getValue().raftLog().latestSnapshotId()
+ ))
+ );
+
+ assertEquals(BROKER_COUNT + CONTROLLER_COUNT, raftManagers.size());
+
+ // For every controller and broker perform some sanity checks against
the latest snapshot
+ for (var raftManager : raftManagers.values()) {
+ try (var snapshot = RecordsSnapshotReader.of(
+ raftManager.raftLog().latestSnapshot().get(),
+ new MetadataRecordSerde(),
+ BufferSupplier.create(),
+ 1,
+ true,
+ new LogContext()
+ )) {
+ // Check that the snapshot is non-empty
+ assertTrue(snapshot.hasNext());
+
+ // Check that we can read the entire snapshot
+ while (snapshot.hasNext()) {
+ var batch = snapshot.next();
+ assertTrue(batch.sizeInBytes() > 0);
+ // A batch must have at least one control records or at
least one data records, but not both
+ assertNotEquals(
+ batch.records().isEmpty(),
+ batch.controlRecords().isEmpty(),
+ "data records = " + batch.records() + "; control
records = " + batch.controlRecords()
+ );
+ }
+ }
+ }
+ }
+}