This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 832627fc784 MINOR: Various cleanups in metadata (#14734)
832627fc784 is described below

commit 832627fc78484fdc7c8d6da8a2d20e7691dbf882
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Nov 14 09:25:09 2023 +0100

    MINOR: Various cleanups in metadata (#14734)
    
    - Remove unused code, suppression
    - Simplify/fix test assertions
    - Javadoc cleanups
    
    Reviewers: Josep Prat <[email protected]>
---
 .../metadata/BrokerRegistrationFencingChange.java  |  2 +-
 ...okerRegistrationInControlledShutdownChange.java |  2 +-
 .../apache/kafka/metadata/DelegationTokenData.java |  4 +-
 .../apache/kafka/metadata/LeaderRecoveryState.java |  2 +-
 .../kafka/metadata/PartitionRegistration.java      |  4 +-
 .../metadata/authorizer/StandardAuthorizer.java    |  2 +-
 .../metadata/placement/StripedReplicaPlacer.java   | 82 ++++++++++------------
 .../kafka/metadata/util/BatchFileReader.java       |  5 +-
 .../apache/kafka/metadata/util/RecordRedactor.java |  2 +-
 .../controller/ClusterControlManagerTest.java      |  8 +--
 .../controller/ProducerIdControlManagerTest.java   |  2 -
 .../QuorumControllerMetricsIntegrationTest.java    |  6 +-
 .../kafka/controller/QuorumControllerTest.java     | 14 ++--
 .../kafka/controller/QuorumControllerTestEnv.java  |  2 -
 .../controller/ReplicationControlManagerTest.java  | 33 +++++----
 .../java/org/apache/kafka/image/AclsDeltaTest.java |  2 +-
 .../org/apache/kafka/image/ClusterImageTest.java   |  3 -
 .../kafka/image/DelegationTokenImageTest.java      |  2 +-
 .../org/apache/kafka/image/FakeSnapshotWriter.java |  4 +-
 .../org/apache/kafka/image/ImageDowngradeTest.java |  2 +-
 .../org/apache/kafka/image/MetadataImageTest.java  |  4 --
 .../kafka/image/MetadataVersionChangeTest.java     |  9 +--
 .../loader/metrics/MetadataLoaderMetricsTest.java  |  4 --
 .../kafka/image/publisher/SnapshotEmitterTest.java |  2 +-
 .../org/apache/kafka/metadata/BrokerStateTest.java |  3 -
 .../apache/kafka/metadata/ListenerInfoTest.java    | 16 ++---
 .../kafka/metadata/PartitionRegistrationTest.java  | 16 +++--
 .../org/apache/kafka/metadata/RecordTestUtils.java |  1 -
 .../apache/kafka/metadata/VersionRangeTest.java    |  1 -
 .../metadata/bootstrap/BootstrapDirectoryTest.java |  4 +-
 .../metadata/bootstrap/BootstrapMetadataTest.java  | 10 +--
 .../CapturingDelegationTokenMigrationClient.java   |  2 +-
 .../metadata/util/BatchFileWriterReaderTest.java   |  2 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  | 13 ++--
 .../apache/kafka/metalog/LocalLogManagerTest.java  |  2 +-
 .../kafka/metalog/MockMetaLogManagerListener.java  | 26 ++-----
 36 files changed, 128 insertions(+), 170 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
index ab0bfc6a81f..ee007dd5378 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationFencingChange.java
@@ -35,7 +35,7 @@ public enum BrokerRegistrationFencingChange {
 
     private final static Map<Byte, BrokerRegistrationFencingChange> 
VALUE_TO_ENUM =
         Arrays.stream(BrokerRegistrationFencingChange.values()).
-                collect(Collectors.toMap(v -> Byte.valueOf(v.value()), 
Function.identity()));
+                collect(Collectors.toMap(v -> v.value(), Function.identity()));
 
     public static Optional<BrokerRegistrationFencingChange> fromValue(byte 
value) {
         return Optional.ofNullable(VALUE_TO_ENUM.get(value));
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
index 39f8abf595e..3d49b495da5 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationInControlledShutdownChange.java
@@ -36,7 +36,7 @@ public enum BrokerRegistrationInControlledShutdownChange {
 
     private final static Map<Byte, 
BrokerRegistrationInControlledShutdownChange> VALUE_TO_ENUM =
         Arrays.stream(BrokerRegistrationInControlledShutdownChange.values()).
-            collect(Collectors.toMap(v -> Byte.valueOf(v.value()), 
Function.identity()));
+            collect(Collectors.toMap(v -> v.value(), Function.identity()));
 
     public static Optional<BrokerRegistrationInControlledShutdownChange> 
fromValue(byte value) {
         return Optional.ofNullable(VALUE_TO_ENUM.get(value));
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java 
b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
index 269cdd2458c..df853169e19 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java
@@ -33,7 +33,7 @@ import java.util.Objects;
  */
 public final class DelegationTokenData {
 
-    private TokenInformation tokenInformation;
+    private final TokenInformation tokenInformation;
 
     public static DelegationTokenData fromRecord(DelegationTokenRecord record) 
{
         List<KafkaPrincipal> renewers = new ArrayList<>();
@@ -62,7 +62,7 @@ public final class DelegationTokenData {
         return new DelegationTokenRecord()
             .setOwner(tokenInformation.ownerAsString())
             .setRequester(tokenInformation.tokenRequesterAsString())
-            .setRenewers(new 
ArrayList<String>(tokenInformation.renewersAsString()))
+            .setRenewers(new ArrayList<>(tokenInformation.renewersAsString()))
             .setIssueTimestamp(tokenInformation.issueTimestamp())
             .setMaxTimestamp(tokenInformation.maxTimestamp())
             .setExpirationTimestamp(tokenInformation.expiryTimestamp())
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java 
b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
index 08086751b70..432226e0d41 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/LeaderRecoveryState.java
@@ -56,7 +56,7 @@ public enum LeaderRecoveryState {
 
     private final byte value;
 
-    private LeaderRecoveryState(byte value) {
+    LeaderRecoveryState(byte value) {
         this.value = value;
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 635bff2f501..f5a8b2133bc 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -358,8 +358,8 @@ public class PartitionRegistration {
         if (options.metadataVersion().isDirectoryAssignmentSupported()) {
             record.setDirectories(Uuid.toList(directories));
         } else {
-            for (int i = 0; i < directories.length; i++) {
-                if (!DirectoryId.UNASSIGNED.equals(directories[i])) {
+            for (Uuid directory : directories) {
+                if (!DirectoryId.UNASSIGNED.equals(directory)) {
                     options.handleLoss("the directory assignment state of one 
or more replicas");
                     break;
                 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 47f066c34b4..7e9f779093e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -122,7 +122,7 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer {
         Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
         for (Endpoint endpoint : serverInfo.endpoints()) {
             if (serverInfo.earlyStartListeners().contains(
-                    endpoint.listenerName().orElseGet(() -> ""))) {
+                    endpoint.listenerName().orElse(""))) {
                 result.put(endpoint, CompletableFuture.completedFuture(null));
             } else {
                 result.put(endpoint, initialLoadFuture);
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index a3ac776cded..f13e8ce9897 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -33,89 +33,86 @@ import org.apache.kafka.metadata.OptionalStringComparator;
 
 /**
  * The striped replica placer.
- *
- *
- * GOALS
- * The design of this placer attempts to satisfy a few competing goals.  
Firstly, we want
- * to spread the replicas as evenly as we can across racks.  In the simple 
case where
- * broker racks have not been configured, this goal is a no-op, of course.  
But it is the
+ * <p>
+ * <h3>Goals</h3>
+ * <p>The design of this placer attempts to satisfy a few competing goals. 
Firstly, we want
+ * to spread the replicas as evenly as we can across racks. In the simple case 
where
+ * broker racks have not been configured, this goal is a no-op, of course. But 
it is the
  * highest priority goal in multi-rack clusters.
  *
- * Our second goal is to spread the replicas evenly across brokers.  Since we 
are placing
+ * <p>Our second goal is to spread the replicas evenly across brokers. Since 
we are placing
  * multiple partitions, we try to avoid putting each partition on the same set 
of
- * replicas, even if it does satisfy the rack placement goal.  If any specific 
broker is
+ * replicas, even if it does satisfy the rack placement goal. If any specific 
broker is
  * fenced, we would like the new leaders to distributed evenly across the 
remaining
  * brokers.
  *
- * However, we treat the rack placement goal as higher priority than this 
goal-- if you
+ * <p>However, we treat the rack placement goal as higher priority than this 
goal-- if you
  * configure 10 brokers in rack A and B, and 1 broker in rack C, you will end 
up with a
  * lot of partitions on that one broker in rack C.  If you were to place a lot 
of
  * partitions with replication factor 3, each partition would try to get a 
replica there.
  * In general racks are supposed to be about the same size -- if they aren't, 
this is a
  * user error.
  *
- * Finally, we would prefer to place replicas on unfenced brokers, rather than 
on fenced
+ * <p>Finally, we would prefer to place replicas on unfenced brokers, rather 
than on fenced
  * brokers.
- *
- *
- * CONSTRAINTS
- * In addition to these goals, we have two constraints.  Unlike the goals, 
these are not
- * optional -- they are mandatory.  Placement will fail if a constraint cannot 
be
- * satisfied.  The first constraint is that we can't place more than one 
replica on the
- * same broker.  This imposes an upper limit on replication factor-- for 
example, a 3-node
- * cluster can't have any topics with replication factor 4.  This constraint 
comes from
+ * <p>
+ * <h3>Constraints</h3>
+ * In addition to these goals, we have two constraints. Unlike the goals, 
these are not
+ * optional -- they are mandatory. Placement will fail if a constraint cannot 
be
+ * satisfied. The first constraint is that we can't place more than one 
replica on the
+ * same broker. This imposes an upper limit on replication factor-- for 
example, a 3-node
+ * cluster can't have any topics with replication factor 4. This constraint 
comes from
  * Kafka's internal design.
  *
- * The second constraint is that the leader of each partition must be an 
unfenced broker.
- * This constraint is a bit arbitrary.  In theory, we could allow people to 
create
- * new topics even if every broker were fenced.  However, this would be 
confusing for
+ * <p>The second constraint is that the leader of each partition must be an 
unfenced broker.
+ * This constraint is a bit arbitrary. In theory, we could allow people to 
create
+ * new topics even if every broker were fenced. However, this would be 
confusing for
  * users.
- *
- *
- * ALGORITHM
- * The StripedReplicaPlacer constructor loads the broker data into rack 
objects.  Each
+ * <p>
+ * <h3>Algorithm</h3>
+ * <p>The StripedReplicaPlacer constructor loads the broker data into rack 
objects. Each
  * rack object contains a sorted list of fenced brokers, and a separate sorted 
list of
- * unfenced brokers.  The racks themselves are organized into a sorted list, 
stored inside
+ * unfenced brokers. The racks themselves are organized into a sorted list, 
stored inside
  * the top-level RackList object.
  *
- * The general idea is that we place replicas on to racks in a round-robin 
fashion.  So if
+ * <p>The general idea is that we place replicas on to racks in a round-robin 
fashion. So if
  * we had racks A, B, C, and D, and we were creating a new partition with 
replication
  * factor 3, our first replica might come from A, our second from B, and our 
third from C.
  * Of course our placement would not be very fair if we always started with 
rack A.
- * Therefore, we generate a random starting offset when the RackList is 
created.  So one
- * time we might go B, C, D.  Another time we might go C, D, A.  And so forth.
+ * Therefore, we generate a random starting offset when the RackList is 
created. So one
+ * time we might go B, C, D. Another time we might go C, D, A. And so forth.
  *
- * Note that each partition we generate advances the starting offset by one.
+ * <p>Note that each partition we generate advances the starting offset by one.
  * So in our 4-rack cluster, with 3 partitions, we might choose these racks:
- *
+ * <pre>
  * partition 1: A, B, C
  * partition 2: B, C, A
  * partition 3: C, A, B
- *
+ * </pre>
  * This is what generates the characteristic "striped" pattern of this placer.
  *
- * So far I haven't said anything about how we choose a replica from within a 
rack.  In
- * fact, this is also done in a round-robin fashion.  So if rack A had replica 
A0, A1, A2,
+ * <p>So far I haven't said anything about how we choose a replica from within 
a rack.  In
+ * fact, this is also done in a round-robin fashion. So if rack A had replica 
A0, A1, A2,
  * and A3, we might return A0 the first time, A1, the second, A2 the third, 
and so on.
  * Just like with the racks, we add a random starting offset to mix things up 
a bit.
  *
- * So let's say you had a cluster with racks A, B, and C, and each rack had 3 
replicas,
+ * <p>So let's say you had a cluster with racks A, B, and C, and each rack had 
3 replicas,
  * for 9 nodes in total.
  * If all the offsets were 0, you'd get placements like this:
- *
+ * <pre>
  * partition 1: A0, B0, C0
  * partition 2: B1, C1, A1
  * partition 3: C2, A2, B2
- *
- * One additional complication with choosing a replica within a rack is that 
we want to
- * choose the unfenced replicas first.  In a big cluster with lots of nodes 
available,
- * we'd prefer not to place a new partition on a node that is fenced.  
Therefore, we
+ * </pre>
+ * <p>One additional complication with choosing a replica within a rack is 
that we want to
+ * choose the unfenced replicas first. In a big cluster with lots of nodes 
available,
+ * we'd prefer not to place a new partition on a node that is fenced. 
Therefore, we
  * actually maintain two lists, rather than the single list I described above.
  * We only start using the fenced node list when the unfenced node list is 
totally
  * exhausted.
  *
- * Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
- * fenced replica.  Therefore, we have some special logic to ensure that this 
doesn't
+ * <p>Furthermore, we cannot place the first replica (the leader) of a new 
partition on a
+ * fenced replica. Therefore, we have some special logic to ensure that this 
doesn't
  * happen.
  */
 public class StripedReplicaPlacer implements ReplicaPlacer {
@@ -272,7 +269,6 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
 
         /**
          * The names of all the racks in the cluster.
-         *
          * Racks which have at least one unfenced broker come first (in sorted 
order),
          * followed by racks which have only fenced brokers (also in sorted 
order).
          */
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index e82d37cf78d..174fb416add 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -113,8 +113,7 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
 
     private BatchAndType nextControlBatch(FileChannelRecordBatch input) {
         List<ApiMessageAndVersion> messages = new ArrayList<>();
-        for (Iterator<Record> iter = input.iterator(); iter.hasNext(); ) {
-            Record record = iter.next();
+        for (Record record : input) {
             try {
                 short typeId = ControlRecordType.parseTypeId(record.key());
                 ControlRecordType type = ControlRecordType.fromTypeId(typeId);
@@ -179,6 +178,6 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
         } catch (Exception e) {
             log.error("Error closing fileRecords", e);
         }
-        this.batchIterator = 
Collections.<FileChannelRecordBatch>emptyList().iterator();
+        this.batchIterator = Collections.emptyIterator();
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
index 51c160cfa78..4373fd1de6d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/RecordRedactor.java
@@ -49,7 +49,7 @@ public final class RecordRedactor {
             case USER_SCRAM_CREDENTIAL_RECORD: {
                 UserScramCredentialRecord record = (UserScramCredentialRecord) 
message;
                 return "UserScramCredentialRecord("
-                        + "name=" + ((record.name() == null) ? "null" : "'" + 
record.name().toString() + "'")
+                        + "name=" + ((record.name() == null) ? "null" : "'" + 
record.name() + "'")
                         + ", mechanism=" + record.mechanism()
                         + ", salt=(redacted)"
                         + ", storedKey=(redacted)"
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 69a9d67e3d8..6a66bd3c6db 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -240,7 +240,7 @@ public class ClusterControlManagerTest {
     }
 
     @Test
-    public void testRegistrationWithIncorrectClusterId() throws Exception {
+    public void testRegistrationWithIncorrectClusterId() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
@@ -317,7 +317,7 @@ public class ClusterControlManagerTest {
     }
 
     @Test
-    public void testUnregister() throws Exception {
+    public void testUnregister() {
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
             setBrokerId(1).
             setBrokerEpoch(100).
@@ -365,7 +365,7 @@ public class ClusterControlManagerTest {
 
     @ParameterizedTest
     @ValueSource(ints = {3, 10})
-    public void testPlaceReplicas(int numUsableBrokers) throws Exception {
+    public void testPlaceReplicas(int numUsableBrokers) {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
@@ -418,7 +418,7 @@ public class ClusterControlManagerTest {
 
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", 
"IBP_3_3_IV3"})
-    public void testRegistrationsToRecords(MetadataVersion metadataVersion) 
throws Exception {
+    public void testRegistrationsToRecords(MetadataVersion metadataVersion) {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 900e8d2c7af..1d3ed8bf00e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -122,8 +122,6 @@ public class ProducerIdControlManagerTest {
 
     @Test
     public void testUnknownBrokerOrEpoch() {
-        ControllerResult<ProducerIdsBlock> result;
-
         assertThrows(StaleBrokerEpochException.class, () ->
             producerIdControlManager.generateNextProducerId(99, 0));
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
index 52049b5626d..f48063cda55 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -30,8 +30,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Optional;
@@ -46,13 +44,13 @@ import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.f
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
 import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 @Timeout(value = 40)
 public class QuorumControllerMetricsIntegrationTest {
-    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
 
     static class MockControllerMetrics extends QuorumControllerMetrics {
         final AtomicBoolean closed = new AtomicBoolean(false);
@@ -179,7 +177,7 @@ public class QuorumControllerMetricsIntegrationTest {
             for (QuorumController controller : controlEnv.controllers()) {
                 // Inactive controllers don't set these metrics.
                 if (!controller.isActive()) {
-                    assertEquals(false, 
controller.controllerMetrics().active());
+                    assertFalse(controller.controllerMetrics().active());
                     assertEquals(0L, 
controller.controllerMetrics().timedOutHeartbeats());
                     assertEquals(0L, 
controller.controllerMetrics().operationsTimedOut());
                 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 298cf0dece9..ef7162014fe 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1102,8 +1102,8 @@ public class QuorumControllerTest {
     }
 
     static class InitialSnapshot implements AutoCloseable {
-        File tempDir = null;
-        BatchFileWriter writer = null;
+        File tempDir;
+        BatchFileWriter writer;
 
         public InitialSnapshot(List<ApiMessageAndVersion> records) throws 
Exception {
             tempDir = TestUtils.tempDirectory();
@@ -1292,7 +1292,7 @@ public class QuorumControllerTest {
                     controllerBuilder.setZkMigrationEnabled(migrationEnabled);
                 }).
                 
setBootstrapMetadata(BootstrapMetadata.fromVersion(metadataVersion, "test")).
-                build();
+                build()
         ) {
             QuorumController active = controlEnv.activeController();
             ZkMigrationState zkMigrationState = active.appendReadEvent("read 
migration state", OptionalLong.empty(),
@@ -1317,7 +1317,7 @@ public class QuorumControllerTest {
                         controllerBuilder.setZkMigrationEnabled(true);
                     }).
                     setBootstrapMetadata(bootstrapMetadata).
-                    build();
+                    build()
             ) {
                 QuorumController active = controlEnv.activeController();
                 assertEquals(active.featureControl().zkMigrationState(), 
ZkMigrationState.MIGRATION);
@@ -1453,7 +1453,7 @@ public class QuorumControllerTest {
     @Test
     public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws 
Exception {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build();
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build()
         ) {
             QuorumControllerTestEnv.Builder controlEnvBuilder = new 
QuorumControllerTestEnv.Builder(logEnv).
                     setControllerBuilderInitializer(controllerBuilder -> {
@@ -1571,7 +1571,7 @@ public class QuorumControllerTest {
     @Test
     public void testFailoverDuringMigrationTransaction() throws Exception {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).build();
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).build()
         ) {
             QuorumControllerTestEnv.Builder controlEnvBuilder = new 
QuorumControllerTestEnv.Builder(logEnv).
                 setControllerBuilderInitializer(controllerBuilder -> 
controllerBuilder.setZkMigrationEnabled(true)).
@@ -1616,7 +1616,7 @@ public class QuorumControllerTest {
     @EnumSource(value = MetadataVersion.class, names = {"IBP_3_4_IV0", 
"IBP_3_5_IV0", "IBP_3_6_IV0", "IBP_3_6_IV1"})
     public void testBrokerHeartbeatDuringMigration(MetadataVersion 
metadataVersion) throws Exception {
         try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build();
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).build()
         ) {
             QuorumControllerTestEnv.Builder controlEnvBuilder = new 
QuorumControllerTestEnv.Builder(logEnv).
                 setControllerBuilderInitializer(controllerBuilder ->
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4750fb61fae..cc7ed106774 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.controller;
 
-import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
@@ -98,7 +97,6 @@ public class QuorumControllerTestEnv implements AutoCloseable 
{
         int numControllers = logEnv.logManagers().size();
         this.controllers = new ArrayList<>(numControllers);
         try {
-            ApiVersions apiVersions = new ApiVersions();
             List<Integer> nodeIds = IntStream.range(0, 
numControllers).boxed().collect(Collectors.toList());
             for (int nodeId = 0; nodeId < numControllers; nodeId++) {
                 QuorumController.Builder builder = new 
QuorumController.Builder(nodeId, logEnv.clusterId());
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d9f848d8e4d..8c48024a764 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -921,8 +921,8 @@ public class ReplicationControlManagerTest {
             shrinkIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, shrinkIsrResponse);
         PartitionRegistration partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{1, 2}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         PartitionData expandIsrRequest = newAlterPartition(
             replicationControl, topicIdPartition, isrWithDefaultEpoch(0, 1), 
LeaderRecoveryState.RECOVERED);
@@ -932,8 +932,8 @@ public class ReplicationControlManagerTest {
             expandIsrResult, topicIdPartition, NONE);
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, expandIsrResponse);
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.elr, partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
     }
 
     @Test
@@ -952,19 +952,19 @@ public class ReplicationControlManagerTest {
         ctx.fenceBrokers(Utils.mkSet(2, 3));
 
         PartitionRegistration partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{3}, partition.elr, partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
 
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{1, 3}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         ctx.unfenceBrokers(0, 1, 2, 3);
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{1, 3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{1, 3}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
     }
 
     @Test
@@ -1000,16 +1000,16 @@ public class ReplicationControlManagerTest {
         ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
 
         PartitionRegistration partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{2, 3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{2, 3}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         ctx.unfenceBrokers(2);
         ctx.fenceBrokers(Utils.mkSet(0, 1));
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{0, 3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{2}, partition.isr), 
partition.toString());
+        assertArrayEquals(new int[]{0, 3}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{2}, partition.isr, partition.toString());
         assertEquals(2, partition.leader, partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
     }
 
     @ParameterizedTest
@@ -1152,7 +1152,7 @@ public class ReplicationControlManagerTest {
         long brokerEpoch,
         Uuid topicId,
         AlterPartitionRequestData.PartitionData partitionData
-    ) throws Exception {
+    ) {
         AlterPartitionRequestData request = new AlterPartitionRequestData()
             .setBrokerId(brokerId)
             .setBrokerEpoch(brokerEpoch);
@@ -1424,7 +1424,6 @@ public class ReplicationControlManagerTest {
             anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createResult =
             replicationControl.createTopics(createTopicsRequestContext, 
request, Collections.singleton("foo"));
-        CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         CreatableTopicResult createdTopic = 
createResult.response().topics().find("foo");
         assertEquals(NONE.code(), createdTopic.errorCode());
         ctx.replay(createResult.records());
diff --git a/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java 
b/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
index 13d3aceb5c8..08f67c4ff6d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/AclsDeltaTest.java
@@ -37,7 +37,7 @@ import java.util.Optional;
 @Timeout(40)
 public class AclsDeltaTest {
 
-    private Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
+    private final Uuid aclId = Uuid.fromString("iOZpss6VQUmD6blnqzl50g");
 
     @Test
     public void testRemovesDeleteIfNotInImage() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
index 1bde2c0a42c..309d5cef7e6 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java
@@ -37,8 +37,6 @@ import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -55,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class ClusterImageTest {
-    private static final Logger log = 
LoggerFactory.getLogger(ClusterImageTest.class);
 
     public final static ClusterImage IMAGE1;
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
index 84781eb8832..7d33323b53a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java
@@ -56,7 +56,7 @@ public class DelegationTokenImageTest {
             tokenId,
             SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
             SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
-            new ArrayList<KafkaPrincipal>(),
+            new ArrayList<>(),
             0,
             1000,
             expireTimestamp);
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java 
b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
index d76b1c81f21..f3dabe8d793 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
@@ -28,7 +28,7 @@ import java.util.List;
 
 public class FakeSnapshotWriter implements 
SnapshotWriter<ApiMessageAndVersion> {
     private final OffsetAndEpoch snapshotId;
-    private List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
+    private final List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
     private boolean frozen = false;
     private boolean closed = false;
 
@@ -79,7 +79,7 @@ public class FakeSnapshotWriter implements 
SnapshotWriter<ApiMessageAndVersion>
     @Override
     public long freeze() {
         frozen = true;
-        return batches.size() * 100;
+        return batches.size() * 100L;
     }
 
     @Override
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
index d781280e151..213e3c63393 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ImageDowngradeTest.java
@@ -103,7 +103,7 @@ public class ImageDowngradeTest {
      * Test downgrading to a MetadataVersion that doesn't support 
inControlledShutdown.
      */
     @Test
-    public void testPreControlledShutdownStateVersion() throws Throwable {
+    public void testPreControlledShutdownStateVersion() {
         writeWithExpectedLosses(MetadataVersion.IBP_3_3_IV2,
                 Arrays.asList(
                         "the inControlledShutdown state of one or more 
brokers"),
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index e6c1ec1cee2..0a0d97dd558 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -118,10 +118,6 @@ public class MetadataImageTest {
             .build(), Optional.empty());
     }
 
-    private static void testToImage(MetadataImage image, ImageWriterOptions 
options) {
-        testToImage(image, options, Optional.empty());
-    }
-
     static void testToImage(MetadataImage image, ImageWriterOptions options, 
Optional<List<ApiMessageAndVersion>> fromRecords) {
         testToImage(image, fromRecords.orElseGet(() -> getImageRecords(image, 
options)));
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
index bef49adc37d..5356f59b82e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/MetadataVersionChangeTest.java
@@ -19,8 +19,6 @@ package org.apache.kafka.image;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
@@ -31,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class MetadataVersionChangeTest {
-    private static final Logger log = 
LoggerFactory.getLogger(MetadataVersionChangeTest.class);
 
     private final static MetadataVersionChange CHANGE_3_0_IV1_TO_3_3_IV0 =
         new MetadataVersionChange(IBP_3_0_IV1, IBP_3_3_IV0);
@@ -40,19 +37,19 @@ public class MetadataVersionChangeTest {
         new MetadataVersionChange(IBP_3_3_IV0, IBP_3_0_IV1);
 
     @Test
-    public void testIsUpgrade() throws Throwable {
+    public void testIsUpgrade() {
         assertTrue(CHANGE_3_0_IV1_TO_3_3_IV0.isUpgrade());
         assertFalse(CHANGE_3_3_IV0_TO_3_0_IV1.isUpgrade());
     }
 
     @Test
-    public void testIsDowngrade() throws Throwable {
+    public void testIsDowngrade() {
         assertFalse(CHANGE_3_0_IV1_TO_3_3_IV0.isDowngrade());
         assertTrue(CHANGE_3_3_IV0_TO_3_0_IV1.isDowngrade());
     }
 
     @Test
-    public void testMetadataVersionChangeExceptionToString() throws Throwable {
+    public void testMetadataVersionChangeExceptionToString() {
         assertEquals("org.apache.kafka.image.MetadataVersionChangeException: 
The metadata " +
             "version is changing from 3.0-IV1 to 3.3-IV0",
                 new 
MetadataVersionChangeException(CHANGE_3_0_IV1_TO_3_3_IV0).toString());
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
index 5e3c6c6f571..3e518f5f56c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -45,10 +45,6 @@ public class MetadataLoaderMetricsTest {
             new AtomicReference<>(MetadataProvenance.EMPTY);
         final MetadataLoaderMetrics metrics;
 
-        FakeMetadataLoaderMetrics() {
-            this(Optional.empty());
-        }
-
         FakeMetadataLoaderMetrics(MetricsRegistry registry) {
             this(Optional.of(registry));
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index 3c73eda3631..088b6c921c3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -135,7 +135,7 @@ public class SnapshotEmitterTest {
     }
 
     @Test
-    public void testEmit() throws Exception {
+    public void testEmit() {
         MockRaftClient mockRaftClient = new MockRaftClient();
         MockTime time = new MockTime(0, 10000L, 20000L);
         SnapshotEmitter emitter = new SnapshotEmitter.Builder().
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
index d590f01a5a0..788fe923ce5 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerStateTest.java
@@ -21,12 +21,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Timeout(value = 40)
 public class BrokerStateTest {
-    private static final Logger log = 
LoggerFactory.getLogger(BrokerStateTest.class);
 
     @Test
     public void testFromValue() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
index 257d1984e55..f4f2a843c25 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
@@ -116,14 +116,14 @@ public class ListenerInfoTest {
     }
 
     @Test
-    public void testToControllerRegistrationRequestFailsOnNullHost() throws 
Exception {
+    public void testToControllerRegistrationRequestFailsOnNullHost() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 toControllerRegistrationRequest());
     }
 
     @Test
-    public void testToControllerRegistrationRequestFailsOnZeroPort() throws 
Exception {
+    public void testToControllerRegistrationRequestFailsOnZeroPort() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 withWildcardHostnamesResolved().
@@ -141,14 +141,14 @@ public class ListenerInfoTest {
     }
 
     @Test
-    public void testToControllerRegistrationRecordFailsOnNullHost() throws 
Exception {
+    public void testToControllerRegistrationRecordFailsOnNullHost() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 toControllerRegistrationRecord());
     }
 
     @Test
-    public void testToControllerRegistrationRecordFailsOnZeroPort() throws 
Exception {
+    public void testToControllerRegistrationRecordFailsOnZeroPort() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 withWildcardHostnamesResolved().
@@ -166,14 +166,14 @@ public class ListenerInfoTest {
     }
 
     @Test
-    public void testToBrokerRegistrationRequestFailsOnNullHost() throws 
Exception {
+    public void testToBrokerRegistrationRequestFailsOnNullHost() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 toBrokerRegistrationRequest());
     }
 
     @Test
-    public void testToBrokerRegistrationRequestFailsOnZeroPort() throws 
Exception {
+    public void testToBrokerRegistrationRequestFailsOnZeroPort() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 withWildcardHostnamesResolved().
@@ -191,14 +191,14 @@ public class ListenerInfoTest {
     }
 
     @Test
-    public void testToBrokerRegistrationRecordFailsOnNullHost() throws 
Exception {
+    public void testToBrokerRegistrationRecordFailsOnNullHost() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 toBrokerRegistrationRecord());
     }
 
     @Test
-    public void testToBrokerRegistrationRecordFailsOnZeroPort() throws 
Exception {
+    public void testToBrokerRegistrationRecordFailsOnZeroPort() {
         assertThrows(RuntimeException.class,
             () -> ListenerInfo.create(Arrays.asList(INTERNAL)).
                 withWildcardHostnamesResolved().
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index c2934c77508..39b5ab8133a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -280,8 +280,8 @@ public class PartitionRegistrationTest {
         PartitionRecord expectRecord = new PartitionRecord().
             setTopicId(topicID).
             setPartitionId(0).
-            setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
-            setIsr(Arrays.asList(new Integer[]{0, 1})).
+            setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
+            setIsr(Arrays.asList(0, 1)).
             setLeader(0).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
             setLeaderEpoch(0).
@@ -290,8 +290,8 @@ public class PartitionRegistrationTest {
         when(metadataVersion.partitionRecordVersion()).thenReturn(version);
         if (version > 0) {
             expectRecord.
-                setEligibleLeaderReplicas(Arrays.asList(new Integer[]{2, 3})).
-                setLastKnownELR(Arrays.asList(new Integer[]{4}));
+                setEligibleLeaderReplicas(Arrays.asList(2, 3)).
+                setLastKnownELR(Arrays.asList(4));
         } else {
             when(metadataVersion.isElrSupported()).thenReturn(false);
         }
@@ -318,6 +318,7 @@ public class PartitionRegistrationTest {
         assertEquals(Replicas.toList(Replicas.NONE), 
Replicas.toList(partitionRegistration.addingReplicas));
     }
 
+    @Test
     public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() {
         PartitionRegistration.Builder builder = new 
PartitionRegistration.Builder().
             setReplicas(new int[]{0, 1, 2, 3, 4}).
@@ -331,8 +332,8 @@ public class PartitionRegistrationTest {
         PartitionRecord expectRecord = new PartitionRecord().
             setTopicId(topicID).
             setPartitionId(0).
-            setReplicas(Arrays.asList(new Integer[]{0, 1, 2, 3, 4})).
-            setIsr(Arrays.asList(new Integer[]{0, 1})).
+            setReplicas(Arrays.asList(0, 1, 2, 3, 4)).
+            setIsr(Arrays.asList(0, 1)).
             setLeader(0).
             setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()).
             setLeaderEpoch(0).
@@ -342,8 +343,9 @@ public class PartitionRegistrationTest {
             setMetadataVersion(MetadataVersion.latest()).
             setLossHandler(exceptions::add).
             build();
-        assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), 
partitionRegistration.toRecord(topicID, 0, options));
+        assertEquals(new ApiMessageAndVersion(expectRecord, (short) 1), 
partitionRegistration.toRecord(topicID, 0, options));
         assertEquals(Replicas.toList(Replicas.NONE), 
Replicas.toList(partitionRegistration.addingReplicas));
+        assertTrue(exceptions.isEmpty());
     }
 
     @Property
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 2dcccb6dda2..b35f8075645 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -273,7 +273,6 @@ public class RecordTestUtils {
      *
      * @param o     The input object. It will be modified in-place.
      */
-    @SuppressWarnings("unchecked")
     public static void deepSortRecords(Object o) throws Exception {
         if (o == null) {
             return;
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
index d31e8f81396..4e8cec759ee 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/VersionRangeTest.java
@@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class VersionRangeTest {
-    @SuppressWarnings("unchecked")
     private static VersionRange v(int a, int b) {
         assertTrue(a <= Short.MAX_VALUE);
         assertTrue(a >= Short.MIN_VALUE);
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
index 73ad2b07d9f..e52edd61fc3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
@@ -48,7 +48,7 @@ public class BootstrapDirectoryTest {
     static class BootstrapTestDirectory implements AutoCloseable {
         File directory = null;
 
-        synchronized BootstrapTestDirectory createDirectory() throws Exception 
{
+        synchronized BootstrapTestDirectory createDirectory() {
             directory = TestUtils.tempDirectory("BootstrapTestDirectory");
             return this;
         }
@@ -98,7 +98,7 @@ public class BootstrapDirectoryTest {
     }
 
     @Test
-    public void testMissingDirectory() throws Exception {
+    public void testMissingDirectory() {
         assertEquals("No such directory as ./non/existent/directory",
             assertThrows(RuntimeException.class, () ->
                 new BootstrapDirectory("./non/existent/directory", 
Optional.empty()).read()).getMessage());
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 4dbb90f7c70..3c75f4cadcb 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -48,7 +48,7 @@ public class BootstrapMetadataTest {
             setFeatureLevel((short) 6), (short) 0)));
 
     @Test
-    public void testFromVersion() throws Exception {
+    public void testFromVersion() {
         assertEquals(new BootstrapMetadata(Collections.singletonList(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(FEATURE_NAME).
@@ -58,20 +58,20 @@ public class BootstrapMetadataTest {
     }
 
     @Test
-    public void testFromRecordsList() throws Exception {
+    public void testFromRecordsList() {
         assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1, IBP_3_3_IV2, 
"bar"),
             BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, "bar"));
     }
 
     @Test
-    public void testFromRecordsListWithoutMetadataVersion() throws Exception {
+    public void testFromRecordsListWithoutMetadataVersion() {
         assertEquals("No FeatureLevelRecord for metadata.version was found in 
the bootstrap " +
             "metadata from quux", assertThrows(RuntimeException.class,
                 () -> BootstrapMetadata.fromRecords(emptyList(), 
"quux")).getMessage());
     }
 
     @Test
-    public void testCopyWithOnlyVersion() throws Exception {
+    public void testCopyWithOnlyVersion() {
         assertEquals(new BootstrapMetadata(SAMPLE_RECORDS1.subList(2, 3), 
IBP_3_3_IV2, "baz"),
                 BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, 
"baz").copyWithOnlyVersion());
     }
@@ -82,7 +82,7 @@ public class BootstrapMetadataTest {
                 setFeatureLevel(IBP_3_0_IV1.featureLevel()), (short) 0)));
 
     @Test
-    public void testFromRecordsListWithOldMetadataVersion() throws Exception {
+    public void testFromRecordsListWithOldMetadataVersion() {
         RuntimeException exception = assertThrows(RuntimeException.class,
             () -> 
BootstrapMetadata.fromRecords(RECORDS_WITH_OLD_METADATA_VERSION, "quux"));
         assertEquals("Bootstrap metadata versions before 3.3-IV0 are not 
supported. Can't load " +
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
index 67503e77ebb..0ee6168e99a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/CapturingDelegationTokenMigrationClient.java
@@ -36,7 +36,7 @@ public class CapturingDelegationTokenMigrationClient 
implements DelegationTokenM
 
     @Override
     public List<String> getDelegationTokens() {
-        return new ArrayList<String>();
+        return new ArrayList<>();
     }
 
     @Override
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
index a68a434bf74..d2de01b824d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/util/BatchFileWriterReaderTest.java
@@ -102,7 +102,7 @@ final public class BatchFileWriterReaderTest {
             assertEquals(0, apiMessageAndVersion.version());
 
             SnapshotFooterRecord footerRecord = (SnapshotFooterRecord) 
apiMessageAndVersion.message();
-            assertEquals(0, headerRecord.version());
+            assertEquals(0, footerRecord.version());
         }
     }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c33cdf2a1b6..f2e9d22bb59 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -176,6 +176,11 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
          */
         private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
 
+        /**
+         * Maps committed offset to snapshot reader.
+         */
+        private final NavigableMap<Long, RawSnapshotReader> snapshots = new 
TreeMap<>();
+
         /**
          * The current leader.
          */
@@ -192,11 +197,6 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
          */
         private long initialMaxReadOffset = Long.MAX_VALUE;
 
-        /**
-         * Maps committed offset to snapshot reader.
-         */
-        private NavigableMap<Long, RawSnapshotReader> snapshots = new 
TreeMap<>();
-
         public SharedLogData(Optional<RawSnapshotReader> snapshot) {
             if (snapshot.isPresent()) {
                 RawSnapshotReader initialSnapshot = snapshot.get();
@@ -515,7 +515,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
      * result is half the records getting appended with leader election 
following that.
      * This is done to emulate having some of the records not getting 
committed.
      */
-    private AtomicBoolean resignAfterNonAtomicCommit = new 
AtomicBoolean(false);
+    private final AtomicBoolean resignAfterNonAtomicCommit = new 
AtomicBoolean(false);
 
     public LocalLogManager(LogContext logContext,
                            int nodeId,
@@ -827,7 +827,6 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             // the leader epoch has already advanced. resign is a no op.
             log.debug("Ignoring call to resign from epoch {}. Either we are 
not the leader or the provided epoch is " +
                     "smaller than the current epoch {}", epoch, currentEpoch);
-            return;
         }
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index dfa093478f1..b0745ecc424 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -108,7 +108,7 @@ public class LocalLogManagerTest {
             long highestOffset = -1;
             for (String event : listener.serializedEvents()) {
                 if (event.startsWith(LAST_COMMITTED_OFFSET)) {
-                    long offset = Long.valueOf(
+                    long offset = Long.parseLong(
                         event.substring(LAST_COMMITTED_OFFSET.length() + 1));
                     if (offset < highestOffset) {
                         throw new RuntimeException("Invalid offset: " + offset 
+
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
 
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index 3d7267d94a5..77183f5dcb6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -54,13 +54,9 @@ public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessag
 
                 for (ApiMessageAndVersion messageAndVersion : batch.records()) 
{
                     ApiMessage message = messageAndVersion.message();
-                    StringBuilder bld = new StringBuilder();
-                    bld.append(COMMIT).append(" ").append(message.toString());
-                    serializedEvents.add(bld.toString());
+                    serializedEvents.add(COMMIT + " " + message.toString());
                 }
-                StringBuilder bld = new StringBuilder();
-                bld.append(LAST_COMMITTED_OFFSET).append(" 
").append(lastCommittedOffset);
-                serializedEvents.add(bld.toString());
+                serializedEvents.add(LAST_COMMITTED_OFFSET + " " + 
lastCommittedOffset);
             }
         } finally {
             reader.close();
@@ -76,13 +72,9 @@ public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessag
 
                 for (ApiMessageAndVersion messageAndVersion : batch.records()) 
{
                     ApiMessage message = messageAndVersion.message();
-                    StringBuilder bld = new StringBuilder();
-                    bld.append(SNAPSHOT).append(" 
").append(message.toString());
-                    serializedEvents.add(bld.toString());
+                    serializedEvents.add(SNAPSHOT + " " + message.toString());
                 }
-                StringBuilder bld = new StringBuilder();
-                bld.append(LAST_COMMITTED_OFFSET).append(" 
").append(lastCommittedOffset);
-                serializedEvents.add(bld.toString());
+                serializedEvents.add(LAST_COMMITTED_OFFSET + " " + 
lastCommittedOffset);
             }
         } finally {
             reader.close();
@@ -95,14 +87,10 @@ public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessag
         this.leaderAndEpoch = newLeaderAndEpoch;
 
         if (newLeaderAndEpoch.isLeader(nodeId)) {
-            StringBuilder bld = new StringBuilder();
-            bld.append(NEW_LEADER).append(" ").
-                append(nodeId).append(" ").append(newLeaderAndEpoch.epoch());
-            serializedEvents.add(bld.toString());
+            String bld = NEW_LEADER + " " + nodeId + " " + 
newLeaderAndEpoch.epoch();
+            serializedEvents.add(bld);
         } else if (oldLeaderAndEpoch.isLeader(nodeId)) {
-            StringBuilder bld = new StringBuilder();
-            bld.append(RENOUNCE).append(" ").append(newLeaderAndEpoch.epoch());
-            serializedEvents.add(bld.toString());
+            serializedEvents.add(RENOUNCE + " " + newLeaderAndEpoch.epoch());
         }
     }
 

Reply via email to