This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df4ef5a6210 MINOR: Various cleanups in metadata (#15806)
df4ef5a6210 is described below

commit df4ef5a6210c01661a068123d52af7bf8ba6769d
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Thu Apr 25 23:50:40 2024 +0200

    MINOR: Various cleanups in metadata (#15806)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../apache/kafka/controller/AclControlManager.java |   3 +-
 .../controller/ClientQuotaControlManager.java      |   2 +-
 .../apache/kafka/controller/ControllerResult.java  |   2 +-
 .../controller/ControllerResultAndOffset.java      |   2 +-
 .../controller/DelegationTokenControlManager.java  |   4 +-
 .../controller/PartitionReassignmentReplicas.java  |   3 +-
 .../apache/kafka/controller/QuorumController.java  |  61 ++++----
 .../apache/kafka/image/DelegationTokenImage.java   |   7 +-
 .../java/org/apache/kafka/image/FeaturesImage.java |   4 +-
 .../java/org/apache/kafka/image/ScramImage.java    |  11 +-
 .../kafka/image/loader/LoaderManifestType.java     |   2 +-
 .../kafka/image/loader/MetadataBatchLoader.java    |   2 +-
 .../apache/kafka/image/loader/MetadataLoader.java  |   2 +-
 .../kafka/image/node/ScramCredentialDataNode.java  |   4 +-
 .../apache/kafka/image/node/TopicImageNode.java    |   2 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  14 +-
 .../kafka/metadata/ControllerRegistration.java     |  10 +-
 .../java/org/apache/kafka/metadata/Replicas.java   |  27 ++--
 .../metadata/placement/StripedReplicaPlacer.java   |   2 +-
 .../properties/MetaPropertiesEnsemble.java         |   9 +-
 .../kafka/metadata/util/SnapshotFileReader.java    |   3 +-
 .../apache/kafka/controller/BrokerToElrsTest.java  |   7 +-
 .../apache/kafka/controller/BrokersToIsrsTest.java |   7 +-
 .../controller/PartitionChangeBuilderTest.java     |  44 +++---
 .../controller/ProducerIdControlManagerTest.java   |   9 +-
 .../controller/ReplicationControlManagerTest.java  | 153 ++++++++++-----------
 .../kafka/metadata/PartitionRegistrationTest.java  |   4 +-
 .../kafka/metadata/authorizer/StandardAclTest.java |   4 +-
 .../authorizer/StandardAuthorizerTest.java         |   8 +-
 .../properties/MetaPropertiesEnsembleTest.java     |  14 +-
 .../metadata/properties/PropertiesUtilsTest.java   |   6 +-
 31 files changed, 200 insertions(+), 232 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index 7cd6a948ab8..3e1804cf43c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -47,7 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.controller.QuorumController.MAX_RECORDS_PER_USER_OP;
 
@@ -174,7 +173,7 @@ public class AclControlManager {
                 results.add(new 
AclDeleteResult(ApiError.fromThrowable(e).exception()));
             }
         }
-        return 
ControllerResult.atomicOf(records.stream().collect(Collectors.toList()), 
results);
+        return ControllerResult.atomicOf(new ArrayList<>(records), results);
     }
 
     AclDeleteResult deleteAclsForFilter(AclBindingFilter filter,
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index bb518a06805..db6bf9c327c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -139,7 +139,7 @@ public class ClientQuotaControlManager {
         }
         if (record.remove()) {
             quotas.remove(record.key());
-            if (quotas.size() == 0) {
+            if (quotas.isEmpty()) {
                 clientQuotaData.remove(entity);
             }
             log.info("Replayed ClientQuotaRecord for {} removing {}.", entity, 
record.key());
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
index d130de59282..90954a500f1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java
@@ -69,7 +69,7 @@ class ControllerResult<T> {
     public String toString() {
         return String.format(
             "ControllerResult(records=%s, response=%s, isAtomic=%s)",
-            String.join(",", 
records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            
records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(",")),
             response,
             isAtomic
         );
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
index 8759dff5fa5..c5b6120dae7 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerResultAndOffset.java
@@ -56,7 +56,7 @@ final class ControllerResultAndOffset<T> extends 
ControllerResult<T> {
     public String toString() {
         return String.format(
             "ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, 
offset=%d)",
-            String.join(",", 
records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
+            
records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.joining(",")),
             response(),
             isAtomic(),
             offset
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
index cee9ee0702c..a5fda589a39 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java
@@ -193,7 +193,7 @@ public class DelegationTokenControlManager {
 
         String tokenId = Uuid.randomUuid().toString();
 
-        List<KafkaPrincipal> renewers = new ArrayList<KafkaPrincipal>();
+        List<KafkaPrincipal> renewers = new ArrayList<>();
         for (CreatableRenewers renewer : requestData.renewers()) {
             if (renewer.principalType().equals(KafkaPrincipal.USER_TYPE)) {
                 renewers.add(new KafkaPrincipal(renewer.principalType(), 
renewer.principalName()));
@@ -335,7 +335,7 @@ public class DelegationTokenControlManager {
     // Periodic call to remove expired DelegationTokens
     public List<ApiMessageAndVersion> sweepExpiredDelegationTokens() {
         long now = time.milliseconds();
-        List<ApiMessageAndVersion> records = new 
ArrayList<ApiMessageAndVersion>();
+        List<ApiMessageAndVersion> records = new ArrayList<>();
 
         for (TokenInformation oldTokenInformation: tokenCache.tokens()) {
             if ((oldTokenInformation.maxTimestamp() < now) ||
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
index 56c3188f741..ee1a1d0631c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentReplicas.java
@@ -90,8 +90,7 @@ class PartitionReassignmentReplicas {
         List<Integer> removingReplicas,
         List<Integer> addingReplicas
     ) {
-        return removingReplicas.size() > 0
-            || addingReplicas.size() > 0;
+        return !removingReplicas.isEmpty() || !addingReplicas.isEmpty();
     }
 
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 557547be13c..789c369c9b2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -379,7 +379,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        @SuppressWarnings("unchecked")
         public QuorumController build() throws Exception {
             if (raftClient == null) {
                 throw new IllegalStateException("You must set a raft client.");
@@ -615,7 +614,7 @@ public final class QuorumController implements Controller {
 
         ControllerReadEvent(String name, Supplier<T> handler) {
             this.name = name;
-            this.future = new CompletableFuture<T>();
+            this.future = new CompletableFuture<>();
             this.handler = handler;
         }
 
@@ -678,7 +677,7 @@ public final class QuorumController implements Controller {
         OptionalLong deadlineNs,
         Supplier<T> handler
     ) {
-        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, 
handler);
+        ControllerReadEvent<T> event = new ControllerReadEvent<>(name, 
handler);
         if (deadlineNs.isPresent()) {
             queue.appendWithDeadline(deadlineNs.getAsLong(), event);
         } else {
@@ -758,7 +757,7 @@ public final class QuorumController implements Controller {
             EnumSet<ControllerOperationFlag> flags
         ) {
             this.name = name;
-            this.future = new CompletableFuture<T>();
+            this.future = new CompletableFuture<>();
             this.op = op;
             this.flags = flags;
             this.resultAndOffset = null;
@@ -815,36 +814,34 @@ public final class QuorumController implements Controller 
{
                 // Pass the records to the Raft layer. This will start the 
process of committing
                 // them to the log.
                 long offset = appendRecords(log, result, maxRecordsPerBatch,
-                    new Function<List<ApiMessageAndVersion>, Long>() {
-                        @Override
-                        public Long apply(List<ApiMessageAndVersion> records) {
-                            // Start by trying to apply the record to our 
in-memory state. This should always
-                            // succeed; if it does not, that's a fatal error. 
It is important to do this before
-                            // scheduling the record for Raft replication.
-                            int recordIndex = 0;
-                            long nextWriteOffset = 
offsetControl.nextWriteOffset();
-                            for (ApiMessageAndVersion message : records) {
-                                long recordOffset = nextWriteOffset + 
recordIndex;
-                                try {
-                                    replay(message.message(), 
Optional.empty(), recordOffset);
-                                } catch (Throwable e) {
-                                    String failureMessage = 
String.format("Unable to apply %s " +
-                                        "record at offset %d on active 
controller, from the " +
-                                        "batch with baseOffset %d",
-                                        
message.message().getClass().getSimpleName(),
-                                        recordOffset, nextWriteOffset);
-                                    throw 
fatalFaultHandler.handleFault(failureMessage, e);
-                                }
-                                recordIndex++;
+                    records -> {
+                        // Start by trying to apply the record to our 
in-memory state. This should always
+                        // succeed; if it does not, that's a fatal error. It 
is important to do this before
+                        // scheduling the record for Raft replication.
+                        int recordIndex = 0;
+                        long nextWriteOffset = offsetControl.nextWriteOffset();
+                        for (ApiMessageAndVersion message : records) {
+                            long recordOffset = nextWriteOffset + recordIndex;
+                            try {
+                                replay(message.message(), Optional.empty(), 
recordOffset);
+                            } catch (Throwable e) {
+                                String failureMessage = String.format("Unable 
to apply %s " +
+                                    "record at offset %d on active controller, 
from the " +
+                                    "batch with baseOffset %d",
+                                    
message.message().getClass().getSimpleName(),
+                                    recordOffset, nextWriteOffset);
+                                throw 
fatalFaultHandler.handleFault(failureMessage, e);
                             }
-                            long nextEndOffset = nextWriteOffset - 1 + 
recordIndex;
-                            raftClient.scheduleAtomicAppend(controllerEpoch,
-                                OptionalLong.of(nextWriteOffset),
-                                records);
-                            
offsetControl.handleScheduleAtomicAppend(nextEndOffset);
-                            return nextEndOffset;
+                            recordIndex++;
                         }
-                    });
+                        long nextEndOffset = nextWriteOffset - 1 + recordIndex;
+                        raftClient.scheduleAtomicAppend(controllerEpoch,
+                            OptionalLong.of(nextWriteOffset),
+                            records);
+                        
offsetControl.handleScheduleAtomicAppend(nextEndOffset);
+                        return nextEndOffset;
+                    }
+                );
                 op.processBatchEndOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java 
b/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
index 840f9bf05ab..7e139a5be8b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/DelegationTokenImage.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 
 /**
@@ -54,10 +53,8 @@ public final class DelegationTokenImage {
         } else {
             if (!tokens.isEmpty()) {
                 List<String> tokenIds = new ArrayList<>(tokens.keySet());
-                StringBuffer delegationTokenImageString = new 
StringBuffer("DelegationTokenImage(");
-                
delegationTokenImageString.append(tokenIds.stream().collect(Collectors.joining(",
 ")));
-                delegationTokenImageString.append(")");
-                options.handleLoss(delegationTokenImageString.toString());
+                String delegationTokenImageString = "DelegationTokenImage(" + 
String.join(", ", tokenIds) + ")";
+                options.handleLoss(delegationTokenImageString);
             } 
         }
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java 
b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 4f22588ed02..cbdb5c6489a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 
 /**
@@ -106,8 +105,7 @@ public final class FeaturesImage {
         if (!finalizedVersions.isEmpty()) {
             List<String> features = new 
ArrayList<>(finalizedVersions.keySet());
             features.sort(String::compareTo);
-            options.handleLoss("feature flag(s): " +
-                    features.stream().collect(Collectors.joining(", ")));
+            options.handleLoss("feature flag(s): " + String.join(", ", 
features));
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java 
b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index 19807a63f9e..0066390d868 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -35,7 +35,6 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 
 /**
@@ -61,12 +60,12 @@ public final class ScramImage {
             }
         } else {
             boolean isEmpty = true;
-            StringBuffer scramImageString = new StringBuffer("ScramImage({");
+            StringBuilder scramImageString = new StringBuilder("ScramImage({");
             for (Entry<ScramMechanism, Map<String, ScramCredentialData>> 
mechanismEntry : mechanisms.entrySet()) {
                 if (!mechanismEntry.getValue().isEmpty()) {
                     scramImageString.append(mechanismEntry.getKey() + ":");
                     List<String> users = new 
ArrayList<>(mechanismEntry.getValue().keySet());
-                    
scramImageString.append(users.stream().collect(Collectors.joining(", ")));
+                    scramImageString.append(String.join(", ", users));
                     scramImageString.append("},{");
                     isEmpty = false;
                 }
@@ -84,9 +83,9 @@ public final class ScramImage {
 
     public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
         List<UserName> users = request.users();
-        Map<String, Boolean> uniqueUsers = new HashMap<String, Boolean>();
+        Map<String, Boolean> uniqueUsers = new HashMap<>();
 
-        if ((users == null) || (users.size() == 0)) {
+        if ((users == null) || (users.isEmpty())) {
             // If there are no users listed then get all the users
             for (Map<String, ScramCredentialData> scramCredentialDataSet : 
mechanisms.values()) {
                 for (String user : scramCredentialDataSet.keySet()) {
@@ -111,7 +110,7 @@ public final class ScramImage {
 
             if (!user.getValue()) {
                 boolean datafound = false;
-                List<CredentialInfo> credentialInfos = new 
ArrayList<CredentialInfo>();
+                List<CredentialInfo> credentialInfos = new ArrayList<>();
                 for (Map.Entry<ScramMechanism, Map<String, 
ScramCredentialData>> mechanismsEntry : mechanisms.entrySet()) {
                     Map<String, ScramCredentialData> credentialDataSet = 
mechanismsEntry.getValue();
                     if (credentialDataSet.containsKey(user.getKey())) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
index 4f0eabf99c7..4e2313875b1 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/LoaderManifestType.java
@@ -23,5 +23,5 @@ package org.apache.kafka.image.loader;
  */
 public enum LoaderManifestType {
     LOG_DELTA,
-    SNAPSHOT;
+    SNAPSHOT
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
index cdf95723d42..2ca8cac4c46 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataBatchLoader.java
@@ -45,7 +45,7 @@ public class MetadataBatchLoader {
         STARTED_TRANSACTION,
         CONTINUED_TRANSACTION,
         ENDED_TRANSACTION,
-        ABORTED_TRANSACTION;
+        ABORTED_TRANSACTION
     }
 
     @FunctionalInterface
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index c6194f4e6dd..3c53ece0609 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -451,7 +451,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                         publisher.name(), e);
                 }
             }
-            
metrics.setCurrentControllerId(leaderAndEpoch.leaderId().orElseGet(() -> -1));
+            
metrics.setCurrentControllerId(leaderAndEpoch.leaderId().orElse(-1));
         });
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
 
b/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
index 2c3ca9faaf1..9fbb7ef5b05 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
@@ -34,8 +34,8 @@ public class ScramCredentialDataNode implements MetadataNode {
     }
 
     private static void arrayToHex(byte[] array, StringBuilder bld) {
-        for (int i = 0; i < array.length; i++) {
-            bld.append(String.format("%02x", array[i] & 0xff));
+        for (byte b : array) {
+            bld.append(String.format("%02x", b & 0xff));
         }
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java 
b/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java
index 97f184d2b76..ed2711dff21 100644
--- a/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java
@@ -52,7 +52,7 @@ public final class TopicImageNode implements MetadataNode {
         } else if (name.equals("id")) {
             return new MetadataLeafNode(image.id().toString());
         } else {
-            Integer partitionId;
+            int partitionId;
             try {
                 partitionId = Integer.parseInt(name);
             } catch (NumberFormatException e) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index a003ed670bd..bb915202292 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -43,15 +43,15 @@ import java.util.stream.Collectors;
  */
 public class BrokerRegistration {
     public static class Builder {
-        private int id = 0;
-        private long epoch = -1;
-        private Uuid incarnationId = null;
+        private int id;
+        private long epoch;
+        private Uuid incarnationId;
         private Map<String, Endpoint> listeners;
         private Map<String, VersionRange> supportedFeatures;
-        private Optional<String> rack = Optional.empty();
-        private boolean fenced = false;
-        private boolean inControlledShutdown = false;
-        private boolean isMigratingZkBroker = false;
+        private Optional<String> rack;
+        private boolean fenced;
+        private boolean inControlledShutdown;
+        private boolean isMigratingZkBroker;
         private List<Uuid> directories;
 
         public Builder() {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
index 37246c3f252..c26880bfd15 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/ControllerRegistration.java
@@ -41,11 +41,11 @@ import java.util.stream.Collectors;
  */
 public class ControllerRegistration {
     public static class Builder {
-        private int id = 0;
-        private Uuid incarnationId = null;
-        private boolean zkMigrationReady = false;
-        private Map<String, Endpoint> listeners = null;
-        private Map<String, VersionRange> supportedFeatures = null;
+        private int id;
+        private Uuid incarnationId;
+        private boolean zkMigrationReady;
+        private Map<String, Endpoint> listeners;
+        private Map<String, VersionRange> supportedFeatures;
 
         public Builder() {
             this.id = 0;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java 
b/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
index fa5ef4be4f1..9a923900e6a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/Replicas.java
@@ -38,9 +38,9 @@ public class Replicas {
      */
     public static List<Integer> toList(int[] array) {
         if (array == null) return null;
-        ArrayList<Integer> list = new ArrayList<>(array.length);
-        for (int i = 0; i < array.length; i++) {
-            list.add(array[i]);
+        List<Integer> list = new ArrayList<>(array.length);
+        for (int i : array) {
+            list.add(i);
         }
         return list;
     }
@@ -111,8 +111,7 @@ public class Replicas {
         int j = 0;
         if (sortedIsr[0] < 0) return false;
         int prevIsr = -1;
-        for (int i = 0; i < sortedIsr.length; i++) {
-            int curIsr = sortedIsr[i];
+        for (int curIsr : sortedIsr) {
             if (prevIsr == curIsr) return false;
             prevIsr = curIsr;
             while (true) {
@@ -133,8 +132,8 @@ public class Replicas {
      * @return              True only if the value is found in the array.
      */
     public static boolean contains(int[] replicas, int value) {
-        for (int i = 0; i < replicas.length; i++) {
-            if (replicas[i] == value) return true;
+        for (int replica : replicas) {
+            if (replica == value) return true;
         }
         return false;
     }
@@ -174,15 +173,14 @@ public class Replicas {
      */
     public static int[] copyWithout(int[] replicas, int value) {
         int size = 0;
-        for (int i = 0; i < replicas.length; i++) {
-            if (replicas[i] != value) {
+        for (int replica : replicas) {
+            if (replica != value) {
                 size++;
             }
         }
         int[] result = new int[size];
         int j = 0;
-        for (int i = 0; i < replicas.length; i++) {
-            int replica = replicas[i];
+        for (int replica : replicas) {
             if (replica != value) {
                 result[j++] = replica;
             }
@@ -200,15 +198,14 @@ public class Replicas {
      */
     public static int[] copyWithout(int[] replicas, int[] values) {
         int size = 0;
-        for (int i = 0; i < replicas.length; i++) {
-            if (!Replicas.contains(values, replicas[i])) {
+        for (int replica : replicas) {
+            if (!Replicas.contains(values, replica)) {
                 size++;
             }
         }
         int[] result = new int[size];
         int j = 0;
-        for (int i = 0; i < replicas.length; i++) {
-            int replica = replicas[i];
+        for (int replica : replicas) {
             if (!Replicas.contains(values, replica)) {
                 result[j++] = replica;
             }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
index 3f587a3d47e..9203c1a7111 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java
@@ -181,7 +181,7 @@ public class StripedReplicaPlacer implements ReplicaPlacer {
          *                  returned in this epoch.
          */
         int next(int epoch) {
-            if (brokers.size() == 0) return -1;
+            if (brokers.isEmpty()) return -1;
             if (this.epoch != epoch) {
                 this.epoch = epoch;
                 this.index = 0;
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java
index 268ba5b5f94..19a2b8b2cb4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java
@@ -43,7 +43,6 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 /**
@@ -165,12 +164,6 @@ public final class MetaPropertiesEnsemble {
      * Utility class for copying a MetaPropertiesEnsemble object, possibly 
with changes.
      */
     public static class Copier {
-        public static final BiConsumer<String, IOException> 
LOGGING_ERROR_HANDLER = new BiConsumer<String, IOException>() {
-            @Override
-            public void accept(String logDir, IOException e) {
-                MetaPropertiesEnsemble.LOG.error("Error while writing 
meta.properties to {}", logDir, e);
-            }
-        };
 
         private final MetaPropertiesEnsemble prev;
         private final Set<String> emptyLogDirs;
@@ -258,7 +251,7 @@ public final class MetaPropertiesEnsemble {
         }
 
         /**
-         * Set the the current metadata log directory.
+         * Set the current metadata log directory.
          *
          * @param metaLogDir    The metadata log directory, or Optional.empty 
if there is none.
          *
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 19875741ec2..a7bfd7b7965 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -125,8 +125,7 @@ public final class SnapshotFileReader implements 
AutoCloseable {
     }
 
     private void handleControlBatch(FileChannelRecordBatch batch) {
-        for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
-            Record record = iter.next();
+        for (Record record : batch) {
             try {
                 short typeId = ControlRecordType.parseTypeId(record.key());
                 ControlRecordType type = ControlRecordType.fromTypeId(typeId);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokerToElrsTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/BrokerToElrsTest.java
index 890a2985312..b385428ca04 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokerToElrsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerToElrsTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.server.common.TopicIdPartition;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -35,11 +36,7 @@ public class BrokerToElrsTest {
     };
 
     private static Set<TopicIdPartition> toSet(TopicIdPartition... partitions) 
{
-        HashSet<TopicIdPartition> set = new HashSet<>();
-        for (TopicIdPartition partition : partitions) {
-            set.add(partition);
-        }
-        return set;
+        return new HashSet<>(Arrays.asList(partitions));
     }
 
     private static Set<TopicIdPartition> 
toSet(BrokersToIsrs.PartitionsOnReplicaIterator iterator) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
index a92960fd3c5..f964c7dd387 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokersToIsrsTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -39,11 +40,7 @@ public class BrokersToIsrsTest {
     };
 
     private static Set<TopicIdPartition> toSet(TopicIdPartition... partitions) 
{
-        HashSet<TopicIdPartition> set = new HashSet<>();
-        for (TopicIdPartition partition : partitions) {
-            set.add(partition);
-        }
-        return set;
+        return new HashSet<>(Arrays.asList(partitions));
     }
 
     private static Set<TopicIdPartition> toSet(PartitionsOnReplicaIterator 
iterator) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index b956a7f2ef9..db3d1e162b3 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -49,8 +49,10 @@ import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsN
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
 import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
 import static 
org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -844,8 +846,8 @@ public class PartitionChangeBuilderTest {
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
         if (version >= 2) {
-            assertTrue(Arrays.equals(new int[]{3, 4}, partition.elr), 
partition.toString());
-            assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{3, 4}, partition.elr, 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
         } else {
             assertEquals(0, partition.elr.length);
             assertEquals(0, partition.lastKnownElr.length);
@@ -942,11 +944,11 @@ public class PartitionChangeBuilderTest {
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
         if (version >= 2) {
-            assertTrue(Arrays.equals(new int[]{3}, partition.elr), 
partition.toString());
-            assertTrue(Arrays.equals(new int[]{2}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{3}, partition.elr, 
partition.toString());
+            assertArrayEquals(new int[]{2}, partition.lastKnownElr, 
partition.toString());
         } else {
-            assertTrue(Arrays.equals(new int[]{}, partition.elr), 
partition.toString());
-            assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.elr, 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
         }
     }
 
@@ -995,11 +997,11 @@ public class PartitionChangeBuilderTest {
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
         if (version >= 2) {
-            assertTrue(Arrays.equals(new int[]{2}, partition.elr), 
partition.toString());
-            assertTrue(Arrays.equals(new int[]{3}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{2}, partition.elr, 
partition.toString());
+            assertArrayEquals(new int[]{3}, partition.lastKnownElr, 
partition.toString());
         } else {
-            assertTrue(Arrays.equals(new int[]{}, partition.elr), 
partition.toString());
-            assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.elr, 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
         }
     }
 
@@ -1115,9 +1117,9 @@ public class PartitionChangeBuilderTest {
         );
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
-        assertTrue(Arrays.equals(new int[]{1}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(lastKnownLeaderEnabled ? new int[]{} : new 
int[]{2}, partition.lastKnownElr), partition.toString());
-        assertTrue(Arrays.equals(new int[]{3}, partition.isr), 
partition.toString());
+        assertArrayEquals(new int[]{1}, partition.elr, partition.toString());
+        assertArrayEquals(lastKnownLeaderEnabled ? new int[]{} : new int[]{2}, 
partition.lastKnownElr, partition.toString());
+        assertArrayEquals(new int[]{3}, partition.isr, partition.toString());
     }
 
     @ParameterizedTest
@@ -1167,9 +1169,9 @@ public class PartitionChangeBuilderTest {
         ApiMessageAndVersion expectedRecord = new ApiMessageAndVersion(record, 
version);
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
-        assertTrue(Arrays.equals(new int[]{1, 2, 3, 4}, partition.elr), 
partition.toString());
+        assertArrayEquals(new int[]{1, 2, 3, 4}, partition.elr, 
partition.toString());
         if (lastKnownLeaderEnabled) {
-            assertTrue(Arrays.equals(new int[]{1}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{1}, partition.lastKnownElr, 
partition.toString());
             builder = new PartitionChangeBuilder(partition, topicId, 0, r -> 
false,
                     metadataVersionForPartitionChangeRecordVersion(version), 3)
                 .setElection(Election.PREFERRED)
@@ -1178,11 +1180,11 @@ public class PartitionChangeBuilderTest {
                 .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
                 
.setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
             PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
builder.build().get().message();
-            assertTrue(changeRecord.lastKnownElr() == null, 
changeRecord.toString());
+            assertNull(changeRecord.lastKnownElr(), changeRecord.toString());
         } else {
-            assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+            assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
         }
-        assertTrue(Arrays.equals(new int[]{}, partition.isr), 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.isr, partition.toString());
     }
 
     @Test
@@ -1222,9 +1224,9 @@ public class PartitionChangeBuilderTest {
         );
         assertEquals(Optional.of(expectedRecord), builder.build());
         partition = partition.merge((PartitionChangeRecord) 
builder.build().get().message());
-        assertTrue(Arrays.equals(new int[]{}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{1}, partition.isr), 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.elr, partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
+        assertArrayEquals(new int[]{1}, partition.isr, partition.toString());
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index a58f194b1f2..ce2983eca47 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -36,22 +36,19 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class ProducerIdControlManagerTest {
 
-    private SnapshotRegistry snapshotRegistry;
-    private FeatureControlManager featureControl;
-    private ClusterControlManager clusterControl;
     private ProducerIdControlManager producerIdControlManager;
 
     @BeforeEach
     public void setUp() {
         final MockTime time = new MockTime();
-        snapshotRegistry = new SnapshotRegistry(new LogContext());
-        featureControl = new FeatureControlManager.Builder().
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        FeatureControlManager featureControl = new 
FeatureControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             setQuorumFeatures(new QuorumFeatures(0,
                 QuorumFeatures.defaultFeatureMap(true),
                 Collections.singletonList(0))).
             build();
-        clusterControl = new ClusterControlManager.Builder().
+        ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
             setTime(time).
             setSnapshotRegistry(snapshotRegistry).
             setSessionTimeoutNs(1000).
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 68919d077dc..a5cfcce07b0 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -217,7 +217,7 @@ public class ReplicationControlManagerTest {
             build();
         final ReplicationControlManager replicationControl;
 
-        void replay(List<ApiMessageAndVersion> records) throws Exception {
+        void replay(List<ApiMessageAndVersion> records) {
             RecordTestUtils.replayAll(clusterControl, records);
             RecordTestUtils.replayAll(configurationControl, records);
             RecordTestUtils.replayAll(replicationControl, records);
@@ -267,7 +267,7 @@ public class ReplicationControlManagerTest {
         CreatableTopicResult createTestTopic(String name,
                                              int numPartitions,
                                              short replicationFactor,
-                                             short expectedErrorCode) throws 
Exception {
+                                             short expectedErrorCode) {
             CreateTopicsRequestData request = new CreateTopicsRequestData();
             CreatableTopic topic = new CreatableTopic().setName(name);
             
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
@@ -284,18 +284,18 @@ public class ReplicationControlManagerTest {
             return topicResult;
         }
 
-        CreatableTopicResult createTestTopic(String name, int[][] replicas) 
throws Exception {
+        CreatableTopicResult createTestTopic(String name, int[][] replicas) {
             return createTestTopic(name, replicas, Collections.emptyMap(), 
(short) 0);
         }
 
         CreatableTopicResult createTestTopic(String name, int[][] replicas,
-                                             short expectedErrorCode) throws 
Exception {
+                                             short expectedErrorCode) {
             return createTestTopic(name, replicas, Collections.emptyMap(), 
expectedErrorCode);
         }
 
         CreatableTopicResult createTestTopic(String name, int[][] replicas,
                                              Map<String, String> configs,
-                                             short expectedErrorCode) throws 
Exception {
+                                             short expectedErrorCode) {
             assertNotEquals(0, replicas.length);
             CreateTopicsRequestData request = new CreateTopicsRequestData();
             CreatableTopic topic = new CreatableTopic().setName(name);
@@ -325,7 +325,7 @@ public class ReplicationControlManagerTest {
             return topicResult;
         }
 
-        void deleteTopic(ControllerRequestContext context, Uuid topicId) 
throws Exception {
+        void deleteTopic(ControllerRequestContext context, Uuid topicId) {
             ControllerResult<Map<Uuid, ApiError>> result = 
replicationControl.deleteTopics(context, Collections.singleton(topicId));
             assertEquals(Collections.singleton(topicId), 
result.response().keySet());
             assertEquals(NONE, result.response().get(topicId).error());
@@ -340,15 +340,14 @@ public class ReplicationControlManagerTest {
             replay(result.records());
         }
 
-        void createPartitions(int count, String name,
-                int[][] replicas, short expectedErrorCode) throws Exception {
+        void createPartitions(int count, String name, int[][] replicas, short 
expectedErrorCode) {
             assertNotEquals(0, replicas.length);
             CreatePartitionsTopic topic = new CreatePartitionsTopic().
                 setName(name).
                 setCount(count);
-            for (int i = 0; i < replicas.length; i++) {
+            for (int[] replica : replicas) {
                 topic.assignments().add(new CreatePartitionsAssignment().
-                    setBrokerIds(Replicas.toList(replicas[i])));
+                    setBrokerIds(Replicas.toList(replica)));
             }
             ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_PARTITIONS);
             ControllerResult<List<CreatePartitionsTopicResult>> result =
@@ -360,7 +359,7 @@ public class ReplicationControlManagerTest {
             replay(result.records());
         }
 
-        void registerBrokers(Integer... brokerIds) throws Exception {
+        void registerBrokers(Integer... brokerIds) {
             Object[] brokersAndDirs = new Object[brokerIds.length * 2];
             for (int i = 0; i < brokerIds.length; i++) {
                 brokersAndDirs[i * 2] = brokerIds[i];
@@ -372,7 +371,7 @@ public class ReplicationControlManagerTest {
         }
 
         @SuppressWarnings("unchecked")
-        void registerBrokersWithDirs(Object... brokerIdsAndDirs) throws 
Exception {
+        void registerBrokersWithDirs(Object... brokerIdsAndDirs) {
             if (brokerIdsAndDirs.length % 2 != 0) {
                 throw new IllegalArgumentException("uneven number of 
arguments");
             }
@@ -391,7 +390,7 @@ public class ReplicationControlManagerTest {
             }
         }
 
-        void handleBrokersUncleanShutdown(Integer... brokerIds) throws 
Exception {
+        void handleBrokersUncleanShutdown(Integer... brokerIds) {
             List<ApiMessageAndVersion> records = new ArrayList<>();
             for (int brokerId : brokerIds) {
                 replicationControl.handleBrokerUncleanShutdown(brokerId, 
records);
@@ -404,7 +403,7 @@ public class ReplicationControlManagerTest {
             int leaderId,
             List<BrokerState> isrWithEpoch,
             LeaderRecoveryState leaderRecoveryState
-        ) throws Exception {
+        ) {
             BrokerRegistration registration = 
clusterControl.brokerRegistrations().get(leaderId);
             assertFalse(registration.fenced());
 
@@ -439,11 +438,11 @@ public class ReplicationControlManagerTest {
             replay(alterPartition.records());
         }
 
-        void unfenceBrokers(Integer... brokerIds) throws Exception {
+        void unfenceBrokers(Integer... brokerIds) {
             unfenceBrokers(Utils.mkSet(brokerIds));
         }
 
-        void unfenceBrokers(Set<Integer> brokerIds) throws Exception {
+        void unfenceBrokers(Set<Integer> brokerIds) {
             for (int brokerId : brokerIds) {
                 ControllerResult<BrokerHeartbeatReply> result = 
replicationControl.
                     processBrokerHeartbeat(new BrokerHeartbeatRequestData().
@@ -456,11 +455,11 @@ public class ReplicationControlManagerTest {
             }
         }
 
-        void inControlledShutdownBrokers(Integer... brokerIds) throws 
Exception {
+        void inControlledShutdownBrokers(Integer... brokerIds) {
             inControlledShutdownBrokers(Utils.mkSet(brokerIds));
         }
 
-        void inControlledShutdownBrokers(Set<Integer> brokerIds) throws 
Exception {
+        void inControlledShutdownBrokers(Set<Integer> brokerIds) {
             for (int brokerId : brokerIds) {
                 BrokerRegistrationChangeRecord record = new 
BrokerRegistrationChangeRecord()
                     .setBrokerId(brokerId)
@@ -474,7 +473,7 @@ public class ReplicationControlManagerTest {
             String topic,
             String configKey,
             String configValue
-        ) throws Exception {
+        ) {
             ConfigRecord configRecord = new ConfigRecord()
                 .setResourceType(ConfigResource.Type.TOPIC.id())
                 .setResourceName(topic)
@@ -483,7 +482,7 @@ public class ReplicationControlManagerTest {
             replay(singletonList(new ApiMessageAndVersion(configRecord, 
(short) 0)));
         }
 
-        void fenceBrokers(Set<Integer> brokerIds) throws Exception {
+        void fenceBrokers(Set<Integer> brokerIds) {
             time.sleep(BROKER_SESSION_TIMEOUT_MS);
 
             Set<Integer> unfencedBrokerIds = 
clusterControl.brokerRegistrations().keySet().stream()
@@ -514,7 +513,7 @@ public class ReplicationControlManagerTest {
             return (partition.leader < 0) ? OptionalInt.empty() : 
OptionalInt.of(partition.leader);
         }
 
-        ControllerResult<AssignReplicasToDirsResponseData> 
assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) 
throws Exception {
+        ControllerResult<AssignReplicasToDirsResponseData> 
assignReplicasToDirs(int brokerId, Map<TopicIdPartition, Uuid> assignment) {
             ControllerResult<AssignReplicasToDirsResponseData> result = 
replicationControl.handleAssignReplicasToDirs(
                     AssignmentsHelper.buildRequestData(brokerId, 
defaultBrokerEpoch(brokerId), assignment));
             assertNotNull(result.response());
@@ -547,7 +546,7 @@ public class ReplicationControlManagerTest {
         }
 
         @Override
-        public void close() throws Exception {
+        public void close() {
             // nothing to do
         }
 
@@ -558,7 +557,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopics() throws Exception {
+    public void testCreateTopics() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -619,7 +618,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopicsWithMutationQuotaExceeded() throws Exception {
+    public void testCreateTopicsWithMutationQuotaExceeded() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -639,7 +638,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopicsISRInvariants() throws Exception {
+    public void testCreateTopicsISRInvariants() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
 
@@ -683,7 +682,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopicsWithConfigs() throws Exception {
+    public void testCreateTopicsWithConfigs() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -789,7 +788,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest(name = "testCreateTopicsWithValidateOnlyFlag with 
mutationQuotaExceeded: {0}")
     @ValueSource(booleans = {true, false})
-    public void testCreateTopicsWithValidateOnlyFlag(boolean 
mutationQuotaExceeded) throws Exception {
+    public void testCreateTopicsWithValidateOnlyFlag(boolean 
mutationQuotaExceeded) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(0, 1, 2);
         ctx.unfenceBrokers(0, 1, 2);
@@ -811,7 +810,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testInvalidCreateTopicsWithValidateOnlyFlag() throws Exception 
{
+    public void testInvalidCreateTopicsWithValidateOnlyFlag() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(0, 1, 2);
         ctx.unfenceBrokers(0, 1, 2);
@@ -832,7 +831,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopicsWithPolicy() throws Exception {
+    public void testCreateTopicsWithPolicy() {
         MockCreateTopicPolicy createTopicPolicy = new 
MockCreateTopicPolicy(asList(
             new CreateTopicPolicy.RequestMetadata("foo", 2, (short) 2,
                 null, Collections.emptyMap()),
@@ -856,7 +855,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreateTopicWithCollisionChars() throws Exception {
+    public void testCreateTopicWithCollisionChars() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(0, 1, 2);
         ctx.unfenceBrokers(0, 1, 2);
@@ -906,7 +905,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testRemoveLeaderships() throws Exception {
+    public void testRemoveLeaderships() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3);
@@ -931,7 +930,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testShrinkAndExpandIsr() throws Exception {
+    public void testShrinkAndExpandIsr() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -960,7 +959,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() throws 
Exception {
+    public void testEligibleLeaderReplicas_ShrinkAndExpandIsr() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -999,7 +998,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_BrokerFence() throws Exception {
+    public void testEligibleLeaderReplicas_BrokerFence() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3);
@@ -1030,7 +1029,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_DeleteTopic() throws Exception {
+    public void testEligibleLeaderReplicas_DeleteTopic() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -1054,8 +1053,8 @@ public class ReplicationControlManagerTest {
         assertConsistentAlterPartitionResponse(replicationControl, 
topicIdPartition, shrinkIsrResponse);
         PartitionRegistration partition = 
replicationControl.getPartition(topicIdPartition.topicId(),
             topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{1, 2}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{1, 2}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
         
assertTrue(replicationControl.brokersToElrs().partitionsWithBrokerInElr(1).hasNext());
 
         ControllerRequestContext deleteTopicsRequestContext = 
anonymousContextFor(ApiKeys.DELETE_TOPICS);
@@ -1066,7 +1065,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_EffectiveMinIsr() throws Exception {
+    public void testEligibleLeaderReplicas_EffectiveMinIsr() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().setIsElrEnabled(true).build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -1081,7 +1080,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_CleanElection() throws Exception {
+    public void testEligibleLeaderReplicas_CleanElection() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
             .setIsElrEnabled(true)
             .build();
@@ -1111,7 +1110,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testEligibleLeaderReplicas_UncleanShutdown() throws Exception {
+    public void testEligibleLeaderReplicas_UncleanShutdown() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
             .setIsElrEnabled(true)
             .build();
@@ -1128,25 +1127,25 @@ public class ReplicationControlManagerTest {
         ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
 
         PartitionRegistration partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{2, 3}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{2, 3}, partition.elr, 
partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         // An unclean shutdown ELR member should be kicked out of ELR.
         ctx.handleBrokersUncleanShutdown(3);
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{2}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
+        assertArrayEquals(new int[]{}, partition.lastKnownElr, 
partition.toString());
 
         // An unclean shutdown last ISR member should be recognized as the 
last known leader.
         ctx.handleBrokersUncleanShutdown(0);
         partition = 
replicationControl.getPartition(topicIdPartition.topicId(), 
topicIdPartition.partitionId());
-        assertTrue(Arrays.equals(new int[]{2}, partition.elr), 
partition.toString());
-        assertTrue(Arrays.equals(new int[]{0}, partition.lastKnownElr), 
partition.toString());
+        assertArrayEquals(new int[]{2}, partition.elr, partition.toString());
+        assertArrayEquals(new int[]{0}, partition.lastKnownElr, 
partition.toString());
     }
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
-    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) 
throws Exception {
+    public void testAlterPartitionHandleUnknownTopicIdOrName(short version) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -1183,7 +1182,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testInvalidAlterPartitionRequests() throws Exception {
+    public void testInvalidAlterPartitionRequests() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2);
@@ -1356,7 +1355,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testDeleteTopics() throws Exception {
+    public void testDeleteTopics() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -1431,7 +1430,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testDeleteTopicsWithMutationQuotaExceeded() throws Exception {
+    public void testDeleteTopicsWithMutationQuotaExceeded() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -1458,7 +1457,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreatePartitions() throws Exception {
+    public void testCreatePartitions() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -1550,7 +1549,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreatePartitionsWithMutationQuotaExceeded() throws 
Exception {
+    public void testCreatePartitionsWithMutationQuotaExceeded() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -1588,7 +1587,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void 
testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() throws 
Exception {
+    public void 
testCreatePartitionsFailsWhenAllBrokersAreFencedOrInControlledShutdown() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
         CreateTopicsRequestData request = new CreateTopicsRequestData();
@@ -1624,7 +1623,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreatePartitionsISRInvariants() throws Exception {
+    public void testCreatePartitionsISRInvariants() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replicationControl = ctx.replicationControl;
 
@@ -1668,7 +1667,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testValidateGoodManualPartitionAssignments() throws Exception {
+    public void testValidateGoodManualPartitionAssignments() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(1, 2, 3);
         
ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1)),
@@ -1682,7 +1681,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testValidateBadManualPartitionAssignments() throws Exception {
+    public void testValidateBadManualPartitionAssignments() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(1, 2);
         assertEquals("The manual partition assignment includes an empty 
replica list.",
@@ -1709,7 +1708,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
-    public void testReassignPartitions(short version) throws Exception {
+    public void testReassignPartitions(short version) {
         MetadataVersion metadataVersion = MetadataVersion.latestTesting();
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
                 .setMetadataVersion(metadataVersion)
@@ -1834,7 +1833,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
-    public void testAlterPartitionShouldRejectFencedBrokers(short version) 
throws Exception {
+    public void testAlterPartitionShouldRejectFencedBrokers(short version) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -1917,7 +1916,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
-    public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short 
version) throws Exception {
+    public void testAlterPartitionShouldRejectBrokersWithStaleEpoch(short 
version) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -1973,8 +1972,8 @@ public class ReplicationControlManagerTest {
             assertEquals(
                 new AlterPartitionResponseData().
                     setTopics(asList(new 
AlterPartitionResponseData.TopicData().
-                        setTopicName(version <= 1 ? "foo" : "").
-                        setTopicId(version > 1 ? fooId : Uuid.ZERO_UUID).
+                        setTopicName("").
+                        setTopicId(fooId).
                         setPartitions(asList(new 
AlterPartitionResponseData.PartitionData().
                             setPartitionIndex(0).
                             setErrorCode(INELIGIBLE_REPLICA.code()))))),
@@ -1986,7 +1985,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
-    public void testAlterPartitionShouldRejectShuttingDownBrokers(short 
version) throws Exception {
+    public void testAlterPartitionShouldRejectShuttingDownBrokers(short 
version) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -2046,7 +2045,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCancelReassignPartitions() throws Exception {
+    public void testCancelReassignPartitions() {
         MetadataVersion metadataVersion = MetadataVersion.latestTesting();
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
                 .setMetadataVersion(metadataVersion)
@@ -2202,7 +2201,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testManualPartitionAssignmentOnAllFencedBrokers() throws 
Exception {
+    public void testManualPartitionAssignmentOnAllFencedBrokers() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(0, 1, 2, 3);
         ctx.createTestTopic("foo", new int[][] {new int[] {0, 1, 2}},
@@ -2210,7 +2209,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() 
throws Exception {
+    public void testCreatePartitionsFailsWithManualAssignmentWithAllFenced() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ctx.registerBrokers(0, 1, 2, 3, 4, 5);
         ctx.unfenceBrokers(0, 1, 2);
@@ -2244,7 +2243,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) 
throws Exception {
+    public void testElectUncleanLeaders_WithoutElr(boolean electAllPartitions) 
{
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build(MetadataVersion.IBP_3_6_IV1);
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -2330,7 +2329,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testPreferredElectionDoesNotTriggerUncleanElection() throws 
Exception {
+    public void testPreferredElectionDoesNotTriggerUncleanElection() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(1, 2, 3, 4);
@@ -2383,7 +2382,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testFenceMultipleBrokers() throws Exception {
+    public void testFenceMultipleBrokers() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -2413,7 +2412,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testElectPreferredLeaders() throws Exception {
+    public void testElectPreferredLeaders() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -2533,7 +2532,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testBalancePartitionLeaders() throws Exception {
+    public void testBalancePartitionLeaders() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -2659,7 +2658,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testKRaftClusterDescriber() throws Exception {
+    public void testKRaftClusterDescriber() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokersWithDirs(
@@ -2690,7 +2689,7 @@ public class ReplicationControlManagerTest {
 
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", 
"IBP_3_3_IV3"})
-    public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion 
metadataVersion) throws Exception {
+    public void testProcessBrokerHeartbeatInControlledShutdown(MetadataVersion 
metadataVersion) {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
                 setMetadataVersion(metadataVersion).
                 build();
@@ -2732,7 +2731,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void testProcessExpiredBrokerHeartbeat() throws Exception {
+    public void testProcessExpiredBrokerHeartbeat() {
         MockTime mockTime = new MockTime(0, 0, 0);
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
                 setMockTime(mockTime).
@@ -2757,7 +2756,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    public void 
testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas()
 throws Exception {
+    public void 
testReassignPartitionsHandlesNewReassignmentThatRemovesPreviouslyAddingReplicas()
 {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         ReplicationControlManager replication = ctx.replicationControl;
         ctx.registerBrokers(0, 1, 2, 3, 4, 5);
@@ -2940,7 +2939,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    void testHandleAssignReplicasToDirsFailsOnOlderMv() throws Exception {
+    void testHandleAssignReplicasToDirsFailsOnOlderMv() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().
             setMetadataVersion(MetadataVersion.IBP_3_7_IV1).
             build();
@@ -2949,7 +2948,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    void testHandleAssignReplicasToDirs() throws Exception {
+    void testHandleAssignReplicasToDirs() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         Uuid dir1b1 = Uuid.fromString("hO2YI5bgRUmByNPHiHxjNQ");
         Uuid dir2b1 = Uuid.fromString("R3Gb1HLoTzuKMgAkH5Vtpw");
@@ -3035,7 +3034,7 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
-    void testHandleDirectoriesOffline() throws Exception {
+    void testHandleDirectoriesOffline() {
         ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder().build();
         int b1 = 101, b2 = 102;
         Uuid dir1b1 = Uuid.fromString("suitdzfTTdqoWcy8VqmkUg");
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 9de6238eca1..8816f2f141d 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -280,11 +280,11 @@ public class PartitionRegistrationTest {
     }
 
     private static Stream<Arguments> 
metadataVersionsForTestPartitionRegistration() {
-        return Arrays.asList(
+        return Stream.of(
             MetadataVersion.IBP_3_7_IV1,
             MetadataVersion.IBP_3_7_IV2,
             MetadataVersion.IBP_3_8_IV0
-        ).stream().map(mv -> Arguments.of(mv));
+        ).map(mv -> Arguments.of(mv));
     }
 
     @ParameterizedTest
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAclTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAclTest.java
index 1cd42a26957..d8ab89f08a5 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAclTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAclTest.java
@@ -83,9 +83,7 @@ public class StandardAclTest {
     }
 
     private static int signum(int input) {
-        if (input < 0) return -1;
-        else if (input > 0) return 1;
-        else return 0;
+        return Integer.compare(input, 0);
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 8ed33e2e361..a186cc60ff4 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -356,7 +356,7 @@ public class StandardAuthorizerTest {
     }
 
     @Test
-    public void testListAcls() throws Exception {
+    public void testListAcls() {
         StandardAuthorizer authorizer = 
createAndInitializeStandardAuthorizer();
         List<StandardAclWithId> fooAcls = asList(
             withId(newFooAcl(READ, ALLOW)),
@@ -641,7 +641,7 @@ public class StandardAuthorizerTest {
      * listeners.
      */
     @Test
-    public void testStartWithEarlyStartListeners() throws Exception {
+    public void testStartWithEarlyStartListeners() {
         StandardAuthorizer authorizer = new StandardAuthorizer();
         authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
         Map<Endpoint, ? extends CompletionStage<Void>> futures2 = authorizer.
@@ -674,7 +674,7 @@ public class StandardAuthorizerTest {
     }
 
     @Test
-    public void testCompleteInitialLoad() throws Exception {
+    public void testCompleteInitialLoad() {
         StandardAuthorizer authorizer = new StandardAuthorizer();
         authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
         Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
@@ -687,7 +687,7 @@ public class StandardAuthorizerTest {
     }
 
     @Test
-    public void testCompleteInitialLoadWithException() throws Exception {
+    public void testCompleteInitialLoadWithException() {
         StandardAuthorizer authorizer = new StandardAuthorizer();
         authorizer.configure(Collections.singletonMap(SUPER_USERS_CONFIG, 
"User:superman"));
         Map<Endpoint, ? extends CompletionStage<Void>> futures = authorizer.
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsembleTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsembleTest.java
index 3c99b3b2228..6266df68065 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsembleTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsembleTest.java
@@ -34,6 +34,7 @@ import java.util.OptionalInt;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.test.TestUtils;
@@ -54,7 +55,7 @@ final public class MetaPropertiesEnsembleTest {
         new MetaPropertiesEnsemble(
             new HashSet<>(Arrays.asList("/tmp/empty1", "/tmp/empty2")),
             new HashSet<>(Arrays.asList("/tmp/error3")),
-            Arrays.asList(
+            Stream.of(
                 new SimpleImmutableEntry<>("/tmp/dir4",
                     new MetaProperties.Builder().
                         setVersion(MetaPropertiesVersion.V1).
@@ -66,7 +67,7 @@ final public class MetaPropertiesEnsembleTest {
                         setVersion(MetaPropertiesVersion.V1).
                         setClusterId("fooClusterId").
                         setNodeId(2).
-                        build())).stream().collect(Collectors.
+                        build())).collect(Collectors.
                             toMap(Entry::getKey, Entry::getValue)),
                 Optional.of("/tmp/dir4"));
 
@@ -77,7 +78,7 @@ final public class MetaPropertiesEnsembleTest {
         return logDir.getAbsolutePath();
     }
 
-    private static String createEmptyLogDir() throws IOException {
+    private static String createEmptyLogDir() {
         File logDir = TestUtils.tempDirectory();
         return logDir.getAbsolutePath();
     }
@@ -222,7 +223,7 @@ final public class MetaPropertiesEnsembleTest {
     }
 
     @Test
-    public void testVerificationFailureOnLackOfMetadataLogDir() throws 
IOException {
+    public void testVerificationFailureOnLackOfMetadataLogDir() {
         MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble(
             Collections.singleton("/tmp/foo1"),
             Collections.emptySet(),
@@ -237,7 +238,7 @@ final public class MetaPropertiesEnsembleTest {
     }
 
     @Test
-    public void testVerificationFailureOnMetadataLogDirWithError() throws 
IOException {
+    public void testVerificationFailureOnMetadataLogDirWithError() {
         MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble(
             Collections.emptySet(),
             Collections.singleton("/tmp/foo1"),
@@ -322,8 +323,7 @@ final public class MetaPropertiesEnsembleTest {
 
     static class MetaPropertiesMockRandom extends Random {
         private final AtomicInteger index = new AtomicInteger(0);
-
-        private List<Long> results = Arrays.asList(
+        private final List<Long> results = Arrays.asList(
             0L,
             0L,
             2336837413447398698L,
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/PropertiesUtilsTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/PropertiesUtilsTest.java
index 926767dcd58..2b9f6d5b13c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/properties/PropertiesUtilsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/properties/PropertiesUtilsTest.java
@@ -65,14 +65,14 @@ final public class PropertiesUtilsTest {
     }
 
     @Test
-    public void loadRequiredIntProp() throws IOException {
+    public void loadRequiredIntProp() {
         Properties props = new Properties();
         props.setProperty("foo.bar", "123");
         assertEquals(123, PropertiesUtils.loadRequiredIntProp(props, 
"foo.bar"));
     }
 
     @Test
-    public void loadMissingRequiredIntProp() throws IOException {
+    public void loadMissingRequiredIntProp() {
         Properties props = new Properties();
         assertEquals("Failed to find foo.bar",
             assertThrows(RuntimeException.class,
@@ -81,7 +81,7 @@ final public class PropertiesUtilsTest {
     }
 
     @Test
-    public void loadNonIntegerRequiredIntProp() throws IOException {
+    public void loadNonIntegerRequiredIntProp() {
         Properties props = new Properties();
         props.setProperty("foo.bar", "b");
         assertEquals("Unable to read foo.bar as a base-10 number.",

Reply via email to