This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cf6a9d80d2 MINOR: Cleanup Tools Module (1/n) (#20091)
5cf6a9d80d2 is described below

commit 5cf6a9d80d21671f9eb9b9a000e826b9487d19f2
Author: Sanskar Jhajharia <sjhajha...@confluent.io>
AuthorDate: Tue Jul 22 15:44:25 2025 +0530

    MINOR: Cleanup Tools Module (1/n) (#20091)
    
    Now that Kafka support Java 17, this PR makes some changes in tools
    module. The changes in this PR are limited to only some files. A future
    PR(s) shall follow.
    The changes mostly include:
    - Collections.emptyList(), Collections.singletonList() and
    Arrays.asList() are replaced with List.of()
    - Collections.emptyMap() and Collections.singletonMap() are replaced
    with Map.of()
    - Collections.singleton() is replaced with Set.of()
    
    Sub modules targeted: tools/src/main
    
    Reviewers: Ken Huang <s7133...@gmail.com>, Jhen-Yung Hsu
    <jhenyung...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 .../java/org/apache/kafka/tools/AclCommand.java    | 26 +++---
 .../kafka/tools/ClientCompatibilityTest.java       |  8 +-
 .../apache/kafka/tools/ClientMetricsCommand.java   | 20 ++---
 .../java/org/apache/kafka/tools/ClusterTool.java   |  6 +-
 .../org/apache/kafka/tools/ConnectPluginPath.java  | 26 +++---
 .../apache/kafka/tools/ConsumerPerformance.java    |  7 +-
 .../apache/kafka/tools/DelegationTokenCommand.java |  3 +-
 .../org/apache/kafka/tools/EndToEndLatency.java    |  6 +-
 .../org/apache/kafka/tools/FeatureCommand.java     |  3 +-
 .../org/apache/kafka/tools/GetOffsetShell.java     |  5 +-
 .../java/org/apache/kafka/tools/GroupsCommand.java |  7 +-
 .../main/java/org/apache/kafka/tools/JmxTool.java  | 10 +--
 .../apache/kafka/tools/LeaderElectionCommand.java  |  5 +-
 .../org/apache/kafka/tools/ManifestWorkspace.java  | 27 ++----
 .../apache/kafka/tools/MetadataQuorumCommand.java  | 56 ++++++-------
 .../java/org/apache/kafka/tools/OffsetsUtils.java  |  5 +-
 .../kafka/tools/ReplicaVerificationTool.java       |  4 +-
 .../kafka/tools/ShareConsumerPerformance.java      |  3 +-
 .../org/apache/kafka/tools/StreamsResetter.java    | 17 ++--
 .../java/org/apache/kafka/tools/TopicCommand.java  | 80 ++++++++----------
 .../kafka/tools/TransactionalMessageCopier.java    | 11 ++-
 .../apache/kafka/tools/TransactionsCommand.java    | 47 +++++------
 .../org/apache/kafka/tools/VerifiableConsumer.java |  7 +-
 .../kafka/tools/VerifiableShareConsumer.java       |  3 +-
 .../kafka/tools/consumer/ConsoleConsumer.java      |  9 +-
 .../tools/consumer/ConsoleConsumerOptions.java     |  7 +-
 .../kafka/tools/consumer/ConsoleShareConsumer.java |  3 +-
 .../tools/consumer/group/ConsumerGroupCommand.java | 25 +++---
 .../group/ConsumerGroupCommandOptions.java         | 14 ++--
 .../tools/consumer/group/ShareGroupCommand.java    |  2 +-
 .../consumer/group/ShareGroupCommandOptions.java   | 12 ++-
 .../tools/reassign/ReassignPartitionsCommand.java  | 96 ++++++++++------------
 .../tools/reassign/VerifyAssignmentResult.java     |  3 +-
 .../kafka/tools/streams/StreamsGroupCommand.java   |  7 +-
 .../tools/streams/StreamsGroupCommandOptions.java  | 16 ++--
 35 files changed, 264 insertions(+), 322 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
index b6c0175331d..791397b2405 100644
--- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -40,9 +40,7 @@ import org.apache.kafka.server.util.CommandLineUtils;
 import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -177,7 +175,7 @@ public class AclCommand {
 
         private static void removeAcls(Admin adminClient, 
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws 
ExecutionException, InterruptedException {
             if (acls.isEmpty()) {
-                adminClient.deleteAcls(Collections.singletonList(new 
AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
+                adminClient.deleteAcls(List.of(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY))).all().get();
             } else {
                 List<AclBindingFilter> aclBindingFilters = 
acls.stream().map(acl -> new AclBindingFilter(filter, 
acl.toFilter())).collect(Collectors.toList());
                 adminClient.deleteAcls(aclBindingFilters).all().get();
@@ -249,8 +247,8 @@ public class AclCommand {
         Set<ResourcePatternFilter> transactionalIds = 
filters.stream().filter(f -> f.resourceType() == 
ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
         boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
 
-        Set<AccessControlEntry> topicAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
-        Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
+        Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(WRITE, 
DESCRIBE, CREATE));
+        Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, 
Set.of(WRITE, DESCRIBE));
 
         //Write, Describe, Create permission on topics, Write, Describe on 
transactionalIds
         Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new 
HashMap<>();
@@ -261,7 +259,7 @@ public class AclCommand {
             result.put(transactionalId, transactionalIdAcls);
         }
         if (enableIdempotence) {
-            result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, 
Collections.singleton(IDEMPOTENT_WRITE)));
+            result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, 
Set.of(IDEMPOTENT_WRITE)));
         }
         return result;
     }
@@ -272,8 +270,8 @@ public class AclCommand {
         Set<ResourcePatternFilter> groups = filters.stream().filter(f -> 
f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
 
         //Read, Describe on topic, Read on consumerGroup
-        Set<AccessControlEntry> topicAcls = getAcl(opts, new 
HashSet<>(Arrays.asList(READ, DESCRIBE)));
-        Set<AccessControlEntry> groupAcls = getAcl(opts, 
Collections.singleton(READ));
+        Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(READ, 
DESCRIBE));
+        Set<AccessControlEntry> groupAcls = getAcl(opts, Set.of(READ));
 
         Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new 
HashMap<>();
         for (ResourcePatternFilter topic : topics) {
@@ -333,9 +331,9 @@ public class AclCommand {
         if (opts.options.has(hostOptionSpec)) {
             return 
opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
         } else if (opts.options.has(principalOptionSpec)) {
-            return Collections.singleton(AclEntry.WILDCARD_HOST);
+            return Set.of(AclEntry.WILDCARD_HOST);
         } else {
-            return Collections.emptySet();
+            return Set.of();
         }
     }
 
@@ -345,7 +343,7 @@ public class AclCommand {
                     .map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
                     .collect(Collectors.toSet());
         } else {
-            return Collections.emptySet();
+            return Set.of();
         }
     }
 
@@ -547,7 +545,7 @@ public class AclCommand {
             if (!options.has(bootstrapServerOpt) && 
!options.has(bootstrapControllerOpt)) {
                 CommandLineUtils.printUsageAndExit(parser, "One of 
--bootstrap-server or --bootstrap-controller must be specified");
             }
-            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = 
Arrays.asList(addOpt, removeOpt, listOpt);
+            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = 
List.of(addOpt, removeOpt, listOpt);
             long mutuallyExclusiveOptionsCount = 
mutuallyExclusiveOptions.stream()
                     .filter(abstractOptionSpec -> 
options.has(abstractOptionSpec))
                     .count();
@@ -592,10 +590,10 @@ public class AclCommand {
 
         @Override
         public String valuePattern() {
-            List<PatternType> values = Arrays.asList(PatternType.values());
+            List<PatternType> values = List.of(PatternType.values());
             List<PatternType> filteredValues = values.stream()
                     .filter(type -> type != PatternType.UNKNOWN)
-                    .collect(Collectors.toList());
+                    .toList();
             return filteredValues.stream()
                     .map(Object::toString)
                     .collect(Collectors.joining("|"));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index b4f189cfa38..73c01b3ea1c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -59,7 +59,6 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -67,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -291,13 +291,13 @@ public class ClientCompatibilityTest {
             tryFeature("createTopics", testConfig.createTopicsSupported,
                 () -> {
                     try {
-                        client.createTopics(Collections.singleton(
+                        client.createTopics(Set.of(
                             new NewTopic("newtopic", 1, (short) 
1))).all().get();
                     } catch (ExecutionException e) {
                         throw e.getCause();
                     }
                 },
-                () ->  createTopicsResultTest(client, 
Collections.singleton("newtopic"))
+                () ->  createTopicsResultTest(client, Set.of("newtopic"))
             );
 
             while (true) {
@@ -337,7 +337,7 @@ public class ClientCompatibilityTest {
                     );
 
                     Map<ConfigResource, Config> brokerConfig =
-                        
client.describeConfigs(Collections.singleton(configResource)).all().get();
+                        
client.describeConfigs(Set.of(configResource)).all().get();
 
                     if (brokerConfig.get(configResource).entries().isEmpty()) {
                         throw new KafkaException("Expected to see config 
entries, but got zero entries");
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
index 6bd0f29d33d..6aec33b5938 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -91,11 +91,7 @@ public class ClientMetricsCommand {
             }
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            if (cause != null) {
-                printException(cause);
-            } else {
-                printException(e);
-            }
+            printException(Objects.requireNonNullElse(cause, e));
             exitCode = 1;
         } catch (Throwable t) {
             printException(t);
@@ -130,8 +126,8 @@ public class ClientMetricsCommand {
             Collection<AlterConfigOp> alterEntries = 
configsToBeSet.entrySet().stream()
                     .map(entry -> new AlterConfigOp(new 
ConfigEntry(entry.getKey(), entry.getValue()),
                             entry.getValue().isEmpty() ? 
AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET))
-                    .collect(Collectors.toList());
-            
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, 
alterEntries), alterOptions).all()
+                    .toList();
+            adminClient.incrementalAlterConfigs(Map.of(configResource, 
alterEntries), alterOptions).all()
                     .get(30, TimeUnit.SECONDS);
 
             System.out.println("Altered client metrics config for " + 
entityName + ".");
@@ -144,8 +140,8 @@ public class ClientMetricsCommand {
             ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
             AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
             Collection<AlterConfigOp> alterEntries = oldConfigs.stream()
-                    .map(entry -> new AlterConfigOp(entry, 
AlterConfigOp.OpType.DELETE)).collect(Collectors.toList());
-            
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, 
alterEntries), alterOptions)
+                    .map(entry -> new AlterConfigOp(entry, 
AlterConfigOp.OpType.DELETE)).toList();
+            adminClient.incrementalAlterConfigs(Map.of(configResource, 
alterEntries), alterOptions)
                     .all().get(30, TimeUnit.SECONDS);
 
             System.out.println("Deleted client metrics config for " + 
entityName + ".");
@@ -162,7 +158,7 @@ public class ClientMetricsCommand {
                     System.out.println("The client metric resource " + 
entityNameOpt.get() + " doesn't exist and doesn't have dynamic config.");
                     return;
                 }
-                entities = Collections.singletonList(entityNameOpt.get());
+                entities = List.of(entityNameOpt.get());
             } else {
                 Collection<ConfigResource> resources = adminClient
                     
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new 
ListConfigResourcesOptions())
@@ -189,7 +185,7 @@ public class ClientMetricsCommand {
 
         private Collection<ConfigEntry> getClientMetricsConfig(String 
entityName) throws Exception {
             ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
-            Map<ConfigResource, Config> result = 
adminClient.describeConfigs(Collections.singleton(configResource))
+            Map<ConfigResource, Config> result = 
adminClient.describeConfigs(Set.of(configResource))
                     .all().get(30, TimeUnit.SECONDS);
             return result.get(configResource).entries();
         }
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
index ccffaeae0f1..3048179a4bc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
@@ -32,8 +32,8 @@ import net.sourceforge.argparse4j.inf.Subparser;
 import net.sourceforge.argparse4j.inf.Subparsers;
 
 import java.io.PrintStream;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -74,7 +74,7 @@ public class ClusterTool {
                 .help("Unregister a broker.");
         Subparser listEndpoints = subparsers.addParser("list-endpoints")
                 .help("List endpoints");
-        for (Subparser subpparser : Arrays.asList(clusterIdParser, 
unregisterParser, listEndpoints)) {
+        for (Subparser subpparser : List.of(clusterIdParser, unregisterParser, 
listEndpoints)) {
             MutuallyExclusiveGroup connectionOptions = 
subpparser.addMutuallyExclusiveGroup().required(true);
             connectionOptions.addArgument("--bootstrap-server", "-b")
                     .action(store())
@@ -162,7 +162,7 @@ public class ClusterTool {
             Collection<Node> nodes = 
adminClient.describeCluster(option).nodes().get();
 
             String maxHostLength = String.valueOf(nodes.stream().map(node -> 
node.host().length()).max(Integer::compareTo).orElse(100));
-            String maxRackLength = String.valueOf(nodes.stream().filter(node 
-> node.hasRack()).map(node -> 
node.rack().length()).max(Integer::compareTo).orElse(10));
+            String maxRackLength = 
String.valueOf(nodes.stream().filter(Node::hasRack).map(node -> 
node.rack().length()).max(Integer::compareTo).orElse(10));
 
             if (listControllerEndpoints) {
                 String format = "%-10s %-" + maxHostLength + "s %-10s %-" + 
maxRackLength + "s %-15s%n";
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java 
b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
index 428bdf54a9d..a3543df76c7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
@@ -42,7 +42,6 @@ import java.io.UncheckedIOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -149,14 +148,12 @@ public class ConnectPluginPath {
         if (subcommand == null) {
             throw new ArgumentParserException("No subcommand specified", 
parser);
         }
-        switch (subcommand) {
-            case "list":
-                return new Config(Command.LIST, locations, false, false, out, 
err);
-            case "sync-manifests":
-                return new Config(Command.SYNC_MANIFESTS, locations, 
namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, 
err);
-            default:
-                throw new ArgumentParserException("Unrecognized subcommand: '" 
+ subcommand + "'", parser);
-        }
+        return switch (subcommand) {
+            case "list" -> new Config(Command.LIST, locations, false, false, 
out, err);
+            case "sync-manifests" ->
+                new Config(Command.SYNC_MANIFESTS, locations, 
namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, 
err);
+            default -> throw new ArgumentParserException("Unrecognized 
subcommand: '" + subcommand + "'", parser);
+        };
     }
 
     private static Set<Path> parseLocations(ArgumentParser parser, Namespace 
namespace) throws ArgumentParserException, TerseException {
@@ -197,7 +194,7 @@ public class ConnectPluginPath {
     }
 
     enum Command {
-        LIST, SYNC_MANIFESTS;
+        LIST, SYNC_MANIFESTS
     }
 
     private static class Config {
@@ -326,11 +323,12 @@ public class ConnectPluginPath {
             rowAliases.add(PluginUtils.prunedName(pluginDesc));
             rows.add(newRow(workspace, pluginDesc.className(), new 
ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true));
             // If a corresponding manifest exists, mark it as loadable by 
removing it from the map.
+            // TODO: The use of Collections here shall be fixed with 
KAFKA-19524.
             nonLoadableManifests.getOrDefault(pluginDesc.className(), 
Collections.emptySet()).remove(pluginDesc.type());
         });
         nonLoadableManifests.forEach((className, types) -> types.forEach(type 
-> {
             // All manifests which remain in the map are not loadable
-            rows.add(newRow(workspace, className, Collections.emptyList(), 
type, PluginDesc.UNDEFINED_VERSION, false));
+            rows.add(newRow(workspace, className, List.of(), type, 
PluginDesc.UNDEFINED_VERSION, false));
         }));
         return rows;
     }
@@ -436,8 +434,8 @@ public class ConnectPluginPath {
     }
 
     private static PluginScanResult discoverPlugins(PluginSource source, 
ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) 
{
-        PluginScanResult serviceLoadResult = 
serviceLoaderScanner.discoverPlugins(Collections.singleton(source));
-        PluginScanResult reflectiveResult = 
reflectionScanner.discoverPlugins(Collections.singleton(source));
-        return new PluginScanResult(Arrays.asList(serviceLoadResult, 
reflectiveResult));
+        PluginScanResult serviceLoadResult = 
serviceLoaderScanner.discoverPlugins(Set.of(source));
+        PluginScanResult reflectiveResult = 
reflectionScanner.discoverPlugins(Set.of(source));
+        return new PluginScanResult(List.of(serviceLoadResult, 
reflectiveResult));
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 0a5106d0e9c..0bbc953293f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
@@ -221,8 +220,8 @@ public class ConsumerPerformance {
     }
 
     public static class ConsumerPerfRebListener implements 
ConsumerRebalanceListener {
-        private AtomicLong joinTimeMs;
-        private AtomicLong joinTimeMsInSingleRound;
+        private final AtomicLong joinTimeMs;
+        private final AtomicLong joinTimeMsInSingleRound;
         private long joinStartMs;
 
         public ConsumerPerfRebListener(AtomicLong joinTimeMs, long 
joinStartMs, AtomicLong joinTimeMsInSingleRound) {
@@ -355,7 +354,7 @@ public class ConsumerPerformance {
         }
 
         public Set<String> topic() {
-            return Collections.singleton(options.valueOf(topicOpt));
+            return Set.of(options.valueOf(topicOpt));
         }
 
         public long numMessages() {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
index 04162041f8e..dad388fee6b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java
@@ -38,7 +38,6 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -106,7 +105,7 @@ public class DelegationTokenCommand {
         CreateDelegationTokenResult createResult = 
adminClient.createDelegationToken(createDelegationTokenOptions);
         DelegationToken token = createResult.delegationToken().get();
         System.out.println("Created delegation token with tokenId : " + 
token.tokenInfo().tokenId());
-        printToken(Collections.singletonList(token));
+        printToken(List.of(token));
 
         return token;
     }
diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java 
b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
index 96aed36f37e..c6914b4667b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
+++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java
@@ -34,11 +34,11 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
@@ -90,7 +90,7 @@ public class EndToEndLatency {
         int messageSizeBytes = Integer.parseInt(args[4]);
         Optional<String> propertiesFile = (args.length > 5 && 
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
 
-        if (!Arrays.asList("1", "all").contains(acks)) {
+        if (!List.of("1", "all").contains(acks)) {
             throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
         }
 
@@ -186,7 +186,7 @@ public class EndToEndLatency {
         Admin adminClient = Admin.create(adminProps);
         NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS, 
DEFAULT_REPLICATION_FACTOR);
         try {
-            
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+            adminClient.createTopics(Set.of(newTopic)).all().get();
         } catch (ExecutionException | InterruptedException e) {
             System.out.printf("Creation of topic %s failed%n", topic);
             throw new RuntimeException(e);
diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
index ca0ce8c4ac0..103821cf21d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java
@@ -39,7 +39,6 @@ import net.sourceforge.argparse4j.inf.Subparser;
 import net.sourceforge.argparse4j.inf.Subparsers;
 import net.sourceforge.argparse4j.internal.HelpScreenException;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -240,7 +239,7 @@ public class FeatureCommand {
     }
 
     static String metadataVersionsToString(MetadataVersion first, 
MetadataVersion last) {
-        List<MetadataVersion> versions = 
Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() 
+ 1);
+        List<MetadataVersion> versions = 
List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1);
         return versions.stream()
                 .map(String::valueOf)
                 .collect(Collectors.joining(", "));
diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java 
b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
index a2dafcdef7a..4b46f336087 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java
@@ -42,7 +42,6 @@ import 
org.apache.kafka.tools.filter.TopicPartitionFilter.TopicFilterAndPartitio
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -312,7 +311,7 @@ public class GetOffsetShell {
      * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
      */
     public TopicPartitionFilter 
createTopicPartitionFilterWithPatternList(String topicPartitions) {
-        List<String> ruleSpecs = Arrays.asList(topicPartitions.split(","));
+        List<String> ruleSpecs = List.of(topicPartitions.split(","));
         List<TopicPartitionFilter> rules = ruleSpecs.stream().map(ruleSpec -> {
             try {
                 return parseRuleSpec(ruleSpec);
@@ -338,7 +337,7 @@ public class GetOffsetShell {
         Set<Integer> partitions;
 
         if (partitionsString == null || partitionsString.isEmpty()) {
-            partitions = Collections.emptySet();
+            partitions = Set.of();
         } else {
             try {
                 partitions = 
Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet());
diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
index c79826e7092..d4b933c9f6c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -75,11 +76,7 @@ public class GroupsCommand {
             }
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            if (cause != null) {
-                printException(cause);
-            } else {
-                printException(e);
-            }
+            printException(Objects.requireNonNullElse(cause, e));
             exitCode = 1;
         } catch (Throwable t) {
             printException(t);
diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java 
b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
index 56cf8d85ab8..c87b15aadf8 100644
--- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java
@@ -96,7 +96,7 @@ public class JmxTool {
             while (keepGoing) {
                 long start = System.currentTimeMillis();
                 Map<String, Object> attributes = queryAttributes(conn, found, 
attributesInclude);
-                attributes.put("time", dateFormat.isPresent() ? 
dateFormat.get().format(new Date()) : 
String.valueOf(System.currentTimeMillis()));
+                attributes.put("time", dateFormat.map(format -> 
format.format(new Date())).orElseGet(() -> 
String.valueOf(System.currentTimeMillis())));
                 maybePrintDataRows(reportFormat, numExpectedAttributes, keys, 
attributes);
                 if (options.isOneTime()) {
                     keepGoing = false;
@@ -225,7 +225,7 @@ public class JmxTool {
                         AttributeList attributes = 
conn.getAttributes(objectName, attributesNames(mBeanInfo));
                         List<ObjectName> expectedAttributes = new 
ArrayList<>();
                         attributes.asList().forEach(attribute -> {
-                            if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                            if 
(List.of(attributesInclude.get()).contains(attribute.getName())) {
                                 expectedAttributes.add(objectName);
                             }
                         });
@@ -254,10 +254,10 @@ public class JmxTool {
         for (ObjectName objectName : objectNames) {
             MBeanInfo beanInfo = conn.getMBeanInfo(objectName);
             AttributeList attributes = conn.getAttributes(objectName,
-                    Arrays.stream(beanInfo.getAttributes()).map(a -> 
a.getName()).toArray(String[]::new));
+                    
Arrays.stream(beanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new));
             for (Attribute attribute : attributes.asList()) {
                 if (attributesInclude.isPresent()) {
-                    if 
(Arrays.asList(attributesInclude.get()).contains(attribute.getName())) {
+                    if 
(List.of(attributesInclude.get()).contains(attribute.getName())) {
                         result.put(String.format("%s:%s", 
objectName.toString(), attribute.getName()),
                                 attribute.getValue());
                     }
@@ -395,7 +395,7 @@ public class JmxTool {
 
         private String parseFormat() {
             String reportFormat = 
options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT);
-            return Arrays.asList("properties", "csv", 
"tsv").contains(reportFormat) ? reportFormat : "original";
+            return List.of("properties", "csv", "tsv").contains(reportFormat) 
? reportFormat : "original";
         }
 
         public boolean hasJmxAuthPropOpt() {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
index 2814034e167..2b0f9c4d7b9 100644
--- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -93,7 +92,7 @@ public class LeaderElectionCommand {
         Optional<Integer> partitionOption = 
Optional.ofNullable(commandOptions.getPartition());
         final Optional<Set<TopicPartition>> singleTopicPartition =
             (topicOption.isPresent() && partitionOption.isPresent()) ?
-                Optional.of(Collections.singleton(new 
TopicPartition(topicOption.get(), partitionOption.get()))) :
+                Optional.of(Set.of(new TopicPartition(topicOption.get(), 
partitionOption.get()))) :
                 Optional.empty();
 
         /* Note: No need to look at --all-topic-partitions as we want this to 
be null if it is use.
@@ -346,7 +345,7 @@ public class LeaderElectionCommand {
             }
 
             // One and only one is required: --topic, --all-topic-partitions 
or --path-to-json-file
-            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = 
Arrays.asList(
+            List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(
                 topic,
                 allTopicPartitions,
                 pathToJsonFile
diff --git a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java 
b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
index d600c5df536..305a91cfdc4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java
@@ -45,7 +45,6 @@ import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -83,23 +82,13 @@ public class ManifestWorkspace {
     }
 
     public SourceWorkspace<?> forSource(PluginSource source) throws 
IOException {
-        SourceWorkspace<?> sourceWorkspace;
-        switch (source.type()) {
-            case CLASSPATH:
-                sourceWorkspace = new ClasspathWorkspace(source);
-                break;
-            case MULTI_JAR:
-                sourceWorkspace = new MultiJarWorkspace(source);
-                break;
-            case SINGLE_JAR:
-                sourceWorkspace = new SingleJarWorkspace(source);
-                break;
-            case CLASS_HIERARCHY:
-                sourceWorkspace = new ClassHierarchyWorkspace(source);
-                break;
-            default:
-                throw new IllegalStateException("Unknown source type " + 
source.type());
-        }
+        SourceWorkspace<?> sourceWorkspace = switch (source.type()) {
+            case CLASSPATH -> new ClasspathWorkspace(source);
+            case MULTI_JAR -> new MultiJarWorkspace(source);
+            case SINGLE_JAR -> new SingleJarWorkspace(source);
+            case CLASS_HIERARCHY -> new ClassHierarchyWorkspace(source);
+            default -> throw new IllegalStateException("Unknown source type " 
+ source.type());
+        };
         workspaces.add(sourceWorkspace);
         return sourceWorkspace;
     }
@@ -390,7 +379,7 @@ public class ManifestWorkspace {
         }
         try (FileSystem jar = FileSystems.newFileSystem(
                 new URI("jar", writableJar.toUri().toString(), ""),
-                Collections.emptyMap()
+                Map.of()
         )) {
             Path zipRoot = jar.getRootDirectories().iterator().next();
             // Set dryRun to false because this jar file is already a writable 
copy.
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index 322e0a7bac3..b65655a8c89 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -50,7 +50,6 @@ import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -127,35 +126,36 @@ public class MetadataQuorumCommand {
                 
Optional.ofNullable(namespace.getString("bootstrap_controller")));
             admin = Admin.create(props);
 
-            if (command.equals("describe")) {
-                if (namespace.getBoolean("status") && 
namespace.getBoolean("replication")) {
-                    throw new TerseException("Only one of --status or 
--replication should be specified with describe sub-command");
-                } else if (namespace.getBoolean("replication")) {
-                    boolean humanReadable = 
Optional.of(namespace.getBoolean("human_readable")).orElse(false);
-                    handleDescribeReplication(admin, humanReadable);
-                } else if (namespace.getBoolean("status")) {
-                    if (namespace.getBoolean("human_readable")) {
-                        throw new TerseException("The option --human-readable 
is only supported along with --replication");
+            switch (command) {
+                case "describe" -> {
+                    if (namespace.getBoolean("status") && 
namespace.getBoolean("replication")) {
+                        throw new TerseException("Only one of --status or 
--replication should be specified with describe sub-command");
+                    } else if (namespace.getBoolean("replication")) {
+                        boolean humanReadable = 
Optional.of(namespace.getBoolean("human_readable")).orElse(false);
+                        handleDescribeReplication(admin, humanReadable);
+                    } else if (namespace.getBoolean("status")) {
+                        if (namespace.getBoolean("human_readable")) {
+                            throw new TerseException("The option 
--human-readable is only supported along with --replication");
+                        }
+                        handleDescribeStatus(admin);
+                    } else {
+                        throw new TerseException("One of --status or 
--replication must be specified with describe sub-command");
                     }
-                    handleDescribeStatus(admin);
-                } else {
-                    throw new TerseException("One of --status or --replication 
must be specified with describe sub-command");
                 }
-            } else if (command.equals("add-controller")) {
-                if (optionalCommandConfig == null) {
-                    throw new TerseException("You must supply the 
configuration file of the controller you are " +
-                        "adding when using add-controller.");
+                case "add-controller" -> {
+                    if (optionalCommandConfig == null) {
+                        throw new TerseException("You must supply the 
configuration file of the controller you are " +
+                            "adding when using add-controller.");
+                    }
+                    handleAddController(admin,
+                        namespace.getBoolean("dry_run"),
+                        props);
                 }
-                handleAddController(admin,
-                    namespace.getBoolean("dry_run"),
-                    props);
-            } else if (command.equals("remove-controller")) {
-                handleRemoveController(admin,
+                case "remove-controller" -> handleRemoveController(admin,
                     namespace.getInt("controller_id"),
                     namespace.getString("controller_directory_id"),
                     namespace.getBoolean("dry_run"));
-            } else {
-                throw new IllegalStateException(format("Unknown command: %s", 
command));
+                default -> throw new IllegalStateException(format("Unknown 
command: %s", command));
             }
         } finally {
             if (admin != null)
@@ -231,7 +231,7 @@ public class MetadataQuorumCommand {
                 lastFetchTimestamp,
                 lastCaughtUpTimestamp,
                 status
-            ).map(r -> r.toString()).collect(Collectors.toList());
+            ).map(Object::toString).collect(Collectors.toList());
         }).collect(Collectors.toList());
     }
 
@@ -253,7 +253,7 @@ public class MetadataQuorumCommand {
         QuorumInfo quorumInfo = 
admin.describeMetadataQuorum().quorumInfo().get();
         int leaderId = quorumInfo.leaderId();
         QuorumInfo.ReplicaState leader = 
quorumInfo.voters().stream().filter(voter -> voter.replicaId() == 
leaderId).findFirst().get();
-        QuorumInfo.ReplicaState maxLagFollower = 
quorumInfo.voters().stream().min(Comparator.comparingLong(qi -> 
qi.logEndOffset())).get();
+        QuorumInfo.ReplicaState maxLagFollower = 
quorumInfo.voters().stream().min(Comparator.comparingLong(QuorumInfo.ReplicaState::logEndOffset)).get();
         long maxFollowerLag = leader.logEndOffset() - 
maxLagFollower.logEndOffset();
 
         long maxFollowerLagTimeMs;
@@ -292,7 +292,7 @@ public class MetadataQuorumCommand {
         List<Node> currentVoterList = replicas.stream().map(voter -> new Node(
             voter.replicaId(),
             voter.replicaDirectoryId(),
-            
getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).collect(Collectors.toList());
+            getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).toList();
         return 
currentVoterList.stream().map(Objects::toString).collect(Collectors.joining(", 
", "[", "]"));
     }
 
@@ -378,7 +378,7 @@ public class MetadataQuorumCommand {
 
     static Uuid getMetadataDirectoryId(String metadataDirectory) throws 
Exception {
         MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader().
-            addLogDirs(Collections.singletonList(metadataDirectory)).
+            addLogDirs(List.of(metadataDirectory)).
             addMetadataLogDir(metadataDirectory).
             load();
         MetaProperties metaProperties = 
ensemble.logDirProps().get(metadataDirectory);
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index 4cbb3329aa1..c529e4b6820 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -42,7 +42,6 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -367,7 +366,7 @@ public class OffsetsUtils {
 
             if (resetPlanForGroup == null) {
                 printError("No reset plan for group " + groupId + " found", 
Optional.empty());
-                return Collections.<TopicPartition, 
OffsetAndMetadata>emptyMap();
+                return Map.<TopicPartition, OffsetAndMetadata>of();
             }
 
             Map<TopicPartition, Long> requestedOffsets = 
resetPlanForGroup.keySet().stream().collect(Collectors.toMap(
@@ -376,7 +375,7 @@ public class OffsetsUtils {
 
             return checkOffsetsRange(requestedOffsets).entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
OffsetAndMetadata(e.getValue())));
-        }).orElseGet(Collections::emptyMap);
+        }).orElseGet(Map::of);
     }
 
     public Map<TopicPartition, OffsetAndMetadata> 
resetToCurrent(Collection<TopicPartition> partitionsToReset, 
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index 67ec1f6c50a..c5f1ddcc2d4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -127,7 +127,7 @@ public class ReplicaVerificationTool {
 
                 List<TopicDescription> filteredTopicMetadata = 
topicsMetadata.stream().filter(
                     topicMetadata -> 
options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false)
-                ).collect(Collectors.toList());
+                ).toList();
 
                 if (filteredTopicMetadata.isEmpty()) {
                     LOG.error("No topics found. {} if specified, is either 
filtering out all topics or there is no topic.", options.topicsIncludeOpt);
@@ -196,7 +196,7 @@ public class ReplicaVerificationTool {
                             counter.incrementAndGet()
                         );
                     })
-                    .collect(Collectors.toList());
+                    .toList();
 
                 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                     LOG.info("Stopping all fetchers");
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
index b04bf922d04..9e3f0ff0152 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -396,7 +395,7 @@ public class ShareConsumerPerformance {
         }
 
         public Set<String> topic() {
-            return Collections.singleton(options.valueOf(topicOpt));
+            return Set.of(options.valueOf(topicOpt));
         }
 
         public long numMessages() {
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java 
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index d15d7dcde13..7047b77c180 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -44,7 +44,6 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -178,7 +177,7 @@ public class StreamsResetter {
                                             final StreamsResetterOptions 
options)
         throws ExecutionException, InterruptedException {
         final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
-            Collections.singleton(groupId),
+            Set.of(groupId),
             new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
         try {
             final List<MemberDescription> members =
@@ -212,15 +211,15 @@ public class StreamsResetter {
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
 
-        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
+        if (inputTopics.isEmpty() && intermediateTopics.isEmpty()) {
             System.out.println("No input or intermediate topics specified. 
Skipping seek.");
             return EXIT_CODE_SUCCESS;
         }
 
-        if (inputTopics.size() != 0) {
+        if (!inputTopics.isEmpty()) {
             System.out.println("Reset-offsets for input topics " + 
inputTopics);
         }
-        if (intermediateTopics.size() != 0) {
+        if (!intermediateTopics.isEmpty()) {
             System.out.println("Seek-to-end for intermediate topics " + 
intermediateTopics);
         }
 
@@ -313,7 +312,7 @@ public class StreamsResetter {
     public void maybeSeekToEnd(final String groupId,
                                final Consumer<byte[], byte[]> client,
                                final Set<TopicPartition> 
intermediateTopicPartitions) {
-        if (intermediateTopicPartitions.size() > 0) {
+        if (!intermediateTopicPartitions.isEmpty()) {
             System.out.println("Following intermediate topics offsets will be 
reset to end (for consumer group " + groupId + ")");
             for (final TopicPartition topicPartition : 
intermediateTopicPartitions) {
                 if (allTopics.contains(topicPartition.topic())) {
@@ -328,7 +327,7 @@ public class StreamsResetter {
                             final Set<TopicPartition> inputTopicPartitions,
                             final StreamsResetterOptions options)
         throws IOException, ParseException {
-        if (inputTopicPartitions.size() > 0) {
+        if (!inputTopicPartitions.isEmpty()) {
             System.out.println("Following input topics offsets will be reset 
to (for consumer group " + options.applicationId() + ")");
             if (options.hasToOffset()) {
                 resetOffsetsTo(client, inputTopicPartitions, 
options.toOffset());
@@ -405,7 +404,7 @@ public class StreamsResetter {
             if (partitionOffset.isPresent()) {
                 client.seek(topicPartition, partitionOffset.get());
             } else {
-                client.seekToEnd(Collections.singletonList(topicPartition));
+                client.seekToEnd(List.of(topicPartition));
                 System.out.println("Partition " + topicPartition.partition() + 
" from topic " + topicPartition.topic() +
                         " is empty, without a committed record. Falling back 
to latest known offset.");
             }
@@ -508,7 +507,7 @@ public class StreamsResetter {
         final List<String> topicsToDelete;
 
         if (!specifiedInternalTopics.isEmpty()) {
-            if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) {
+            if (!new 
HashSet<>(inferredInternalTopics).containsAll(specifiedInternalTopics)) {
                 throw new IllegalArgumentException("Invalid topic specified in 
the "
                         + "--internal-topics option. "
                         + "Ensure that the topics specified are all internal 
topics. "
diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
index 4da7f47e564..b4444333500 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
@@ -59,18 +59,19 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import joptsimple.ArgumentAcceptingOptionSpec;
 import joptsimple.OptionSpec;
@@ -112,11 +113,7 @@ public abstract class TopicCommand {
             }
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            if (cause != null) {
-                printException(cause);
-            } else {
-                printException(e);
-            }
+            printException(Objects.requireNonNullElse(cause, e));
             exitCode = 1;
         } catch (Throwable e) {
             printException(e);
@@ -159,10 +156,10 @@ public abstract class TopicCommand {
 
     @SuppressWarnings("deprecation")
     private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions 
opts) {
-        List<List<String>> configsToBeAdded = 
opts.topicConfig().orElse(Collections.emptyList())
+        List<List<String>> configsToBeAdded = 
opts.topicConfig().orElse(List.of())
             .stream()
-            .map(s -> Arrays.asList(s.split("\\s*=\\s*")))
-            .collect(Collectors.toList());
+            .map(s -> List.of(s.split("\\s*=\\s*")))
+            .toList();
 
         if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) 
{
             throw new IllegalArgumentException("requirement failed: Invalid 
topic config: all configs to be added must be in the format \"key=val\".");
@@ -256,7 +253,7 @@ public abstract class TopicCommand {
             name = options.topic().get();
             partitions = options.partitions();
             replicationFactor = options.replicationFactor();
-            replicaAssignment = 
options.replicaAssignment().orElse(Collections.emptyMap());
+            replicaAssignment = options.replicaAssignment().orElse(Map.of());
             configsToAdd = parseTopicConfigsToBeAdded(options);
         }
 
@@ -357,10 +354,10 @@ public abstract class TopicCommand {
                 .collect(Collectors.joining(",")));
             if (reassignment != null) {
                 System.out.print("\tAdding Replicas: " + 
reassignment.addingReplicas().stream()
-                    .map(node -> node.toString())
+                    .map(Object::toString)
                     .collect(Collectors.joining(",")));
                 System.out.print("\tRemoving Replicas: " + 
reassignment.removingReplicas().stream()
-                    .map(node -> node.toString())
+                    .map(Object::toString)
                     .collect(Collectors.joining(",")));
             }
 
@@ -443,9 +440,7 @@ public abstract class TopicCommand {
         }
 
         private static Admin createAdminClient(Properties commandConfig, 
Optional<String> bootstrapServer) {
-            if (bootstrapServer.isPresent()) {
-                commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer.get());
-            }
+            bootstrapServer.ifPresent(s -> 
commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s));
             return Admin.create(commandConfig);
         }
 
@@ -475,10 +470,10 @@ public abstract class TopicCommand {
                 }
 
                 Map<String, String> configsMap = 
topic.configsToAdd.stringPropertyNames().stream()
-                    .collect(Collectors.toMap(name -> name, name -> 
topic.configsToAdd.getProperty(name)));
+                    .collect(Collectors.toMap(name -> name, 
topic.configsToAdd::getProperty));
 
                 newTopic.configs(configsMap);
-                CreateTopicsResult createResult = 
adminClient.createTopics(Collections.singleton(newTopic),
+                CreateTopicsResult createResult = 
adminClient.createTopics(Set.of(newTopic),
                     new CreateTopicsOptions().retryOnQuotaViolation(false));
                 createResult.all().get();
                 System.out.println("Created topic " + topic.name + ".");
@@ -493,9 +488,7 @@ public abstract class TopicCommand {
         }
 
         public void listTopics(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
-            String results = getTopics(opts.topic(), 
opts.excludeInternalTopics())
-                .stream()
-                .collect(Collectors.joining("\n"));
+            String results = String.join("\n", getTopics(opts.topic(), 
opts.excludeInternalTopics()));
             System.out.println(results);
         }
 
@@ -539,7 +532,7 @@ public abstract class TopicCommand {
                 Throwable cause = e.getCause();
                 if (cause instanceof UnsupportedVersionException || cause 
instanceof ClusterAuthorizationException) {
                     LOG.debug("Couldn't query reassignments through the 
AdminClient API: " + cause.getMessage(), cause);
-                    return Collections.emptyMap();
+                    return Map.of();
                 } else {
                     throw new RuntimeException(e);
                 }
@@ -558,9 +551,9 @@ public abstract class TopicCommand {
             List<String> topics;
             if (useTopicId) {
                 topicIds = getTopicIds(inputTopicId.get(), 
opts.excludeInternalTopics());
-                topics = Collections.emptyList();
+                topics = List.of();
             } else {
-                topicIds = Collections.emptyList();
+                topicIds = List.of();
                 topics = getTopics(opts.topic(), opts.excludeInternalTopics());
             }
 
@@ -588,7 +581,7 @@ public abstract class TopicCommand {
 
             List<String> topicNames = topicDescriptions.stream()
                 .map(org.apache.kafka.clients.admin.TopicDescription::name)
-                .collect(Collectors.toList());
+                .toList();
             Map<ConfigResource, KafkaFuture<Config>> allConfigs = 
adminClient.describeConfigs(
                 topicNames.stream()
                     .map(name -> new ConfigResource(ConfigResource.Type.TOPIC, 
name))
@@ -596,7 +589,7 @@ public abstract class TopicCommand {
             ).values();
             List<Integer> liveBrokers = 
adminClient.describeCluster().nodes().get().stream()
                 .map(Node::id)
-                .collect(Collectors.toList());
+                .toList();
             DescribeOptions describeOptions = new DescribeOptions(opts, new 
HashSet<>(liveBrokers));
             Set<TopicPartition> topicPartitions = topicDescriptions
                 .stream()
@@ -647,7 +640,7 @@ public abstract class TopicCommand {
         public void deleteTopic(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
             List<String> topics = getTopics(opts.topic(), 
opts.excludeInternalTopics());
             ensureTopicExists(topics, opts.topic(), !opts.ifExists());
-            adminClient.deleteTopics(Collections.unmodifiableList(topics),
+            adminClient.deleteTopics(List.copyOf(topics),
                 new DeleteTopicsOptions().retryOnQuotaViolation(false)
             ).all().get();
         }
@@ -668,10 +661,10 @@ public abstract class TopicCommand {
             List<Uuid> allTopicIds = allTopics.listings().get().stream()
                 .map(TopicListing::topicId)
                 .sorted()
-                .collect(Collectors.toList());
+                .toList();
             return allTopicIds.contains(topicIdIncludeList) ?
-                Collections.singletonList(topicIdIncludeList) :
-                Collections.emptyList();
+                List.of(topicIdIncludeList) :
+                List.of();
         }
 
         @Override
@@ -835,7 +828,7 @@ public abstract class TopicCommand {
         }
 
         public <A> Optional<List<A>> valuesAsOption(OptionSpec<A> option) {
-            return valuesAsOption(option, Collections.emptyList());
+            return valuesAsOption(option, List.of());
         }
 
         public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A> 
defaultValue) {
@@ -953,8 +946,7 @@ public abstract class TopicCommand {
 
             // should have exactly one action
             long actions =
-                Arrays.asList(createOpt, listOpt, alterOpt, describeOpt, 
deleteOpt)
-                    .stream().filter(options::has)
+                Stream.of(createOpt, listOpt, alterOpt, describeOpt, 
deleteOpt).filter(options::has)
                     .count();
             if (actions != 1)
                 CommandLineUtils.printUsageAndExit(parser, "Command must 
include exactly one action: --list, --describe, --create, --alter or --delete");
@@ -989,29 +981,29 @@ public abstract class TopicCommand {
 
         private void checkInvalidArgs() {
             // check invalid args
-            CommandLineUtils.checkInvalidArgs(parser, options, configOpt, 
invalidOptions(Arrays.asList(alterOpt, createOpt)));
-            CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, 
invalidOptions(Arrays.asList(alterOpt, createOpt)));
-            CommandLineUtils.checkInvalidArgs(parser, options, 
replicationFactorOpt, invalidOptions(Arrays.asList(createOpt)));
-            CommandLineUtils.checkInvalidArgs(parser, options, 
replicaAssignmentOpt, invalidOptions(Arrays.asList(alterOpt, createOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, configOpt, 
invalidOptions(List.of(alterOpt, createOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, 
invalidOptions(List.of(alterOpt, createOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, 
replicationFactorOpt, invalidOptions(List.of(createOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, 
replicaAssignmentOpt, invalidOptions(List.of(alterOpt, createOpt)));
             if (options.has(createOpt)) {
                 CommandLineUtils.checkInvalidArgs(parser, options, 
replicaAssignmentOpt, partitionsOpt, replicationFactorOpt);
             }
 
 
             CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnderReplicatedPartitionsOpt,
-                invalidOptions(Collections.singleton(topicsWithOverridesOpt), 
Arrays.asList(describeOpt, reportUnderReplicatedPartitionsOpt)));
+                invalidOptions(Set.of(topicsWithOverridesOpt), 
List.of(describeOpt, reportUnderReplicatedPartitionsOpt)));
             CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnderMinIsrPartitionsOpt,
-                invalidOptions(Collections.singleton(topicsWithOverridesOpt), 
Arrays.asList(describeOpt, reportUnderMinIsrPartitionsOpt)));
+                invalidOptions(Set.of(topicsWithOverridesOpt), 
List.of(describeOpt, reportUnderMinIsrPartitionsOpt)));
             CommandLineUtils.checkInvalidArgs(parser, options, 
reportAtMinIsrPartitionsOpt,
-                invalidOptions(Collections.singleton(topicsWithOverridesOpt), 
Arrays.asList(describeOpt, reportAtMinIsrPartitionsOpt)));
+                invalidOptions(Set.of(topicsWithOverridesOpt), 
List.of(describeOpt, reportAtMinIsrPartitionsOpt)));
             CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnavailablePartitionsOpt,
-                invalidOptions(Collections.singleton(topicsWithOverridesOpt), 
Arrays.asList(describeOpt, reportUnavailablePartitionsOpt)));
+                invalidOptions(Set.of(topicsWithOverridesOpt), 
List.of(describeOpt, reportUnavailablePartitionsOpt)));
             CommandLineUtils.checkInvalidArgs(parser, options, 
topicsWithOverridesOpt,
-                invalidOptions(new HashSet<>(allReplicationReportOpts), 
Arrays.asList(describeOpt)));
+                invalidOptions(new HashSet<>(allReplicationReportOpts), 
List.of(describeOpt)));
             CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt,
-                invalidOptions(Arrays.asList(alterOpt, deleteOpt, 
describeOpt)));
-            CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, 
invalidOptions(Arrays.asList(createOpt)));
-            CommandLineUtils.checkInvalidArgs(parser, options, 
excludeInternalTopicOpt, invalidOptions(Arrays.asList(listOpt, describeOpt)));
+                invalidOptions(List.of(alterOpt, deleteOpt, describeOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, 
invalidOptions(List.of(createOpt)));
+            CommandLineUtils.checkInvalidArgs(parser, options, 
excludeInternalTopicOpt, invalidOptions(List.of(listOpt, describeOpt)));
         }
 
         private Set<OptionSpec<?>> invalidOptions(List<OptionSpec<?>> 
removeOptions) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 7a79f51046c..2930d490f4f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -47,17 +47,16 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static java.util.Collections.singleton;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -240,13 +239,13 @@ public class TransactionalMessageCopier {
             if (offsetAndMetadata != null)
                 consumer.seek(tp, offsetAndMetadata.offset());
             else
-                consumer.seekToBeginning(singleton(tp));
+                consumer.seekToBeginning(Set.of(tp));
         });
     }
 
     private static long messagesRemaining(KafkaConsumer<String, String> 
consumer, TopicPartition partition) {
         long currentPosition = consumer.position(partition);
-        Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(singleton(partition));
+        Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(Set.of(partition));
         if (endOffsets.containsKey(partition)) {
             return endOffsets.get(partition) - currentPosition;
         }
@@ -319,7 +318,7 @@ public class TransactionalMessageCopier {
         final AtomicLong numMessagesProcessedSinceLastRebalance = new 
AtomicLong(0);
         final AtomicLong totalMessageProcessed = new AtomicLong(0);
         if (groupMode) {
-            consumer.subscribe(Collections.singleton(topicName), new 
ConsumerRebalanceListener() {
+            consumer.subscribe(Set.of(topicName), new 
ConsumerRebalanceListener() {
                 @Override
                 public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
                 }
@@ -341,7 +340,7 @@ public class TransactionalMessageCopier {
             });
         } else {
             TopicPartition inputPartition = new TopicPartition(topicName, 
parsedArgs.getInt("inputPartition"));
-            consumer.assign(singleton(inputPartition));
+            consumer.assign(Set.of(inputPartition));
             remainingMessages.set(Math.min(messagesRemaining(consumer, 
inputPartition), remainingMessages.get()));
         }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index c1dbefcef60..9c5323104fe 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -50,7 +50,6 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -65,8 +64,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 public abstract class TransactionsCommand {
@@ -159,7 +156,7 @@ public abstract class TransactionsCommand {
         ) throws Exception {
             final DescribeProducersResult.PartitionProducerState result;
             try {
-                result = admin.describeProducers(singleton(topicPartition))
+                result = admin.describeProducers(Set.of(topicPartition))
                     .partitionResult(topicPartition)
                     .get();
             } catch (ExecutionException e) {
@@ -345,7 +342,7 @@ public abstract class TransactionsCommand {
             final DescribeProducersResult.PartitionProducerState result;
 
             try {
-                result = admin.describeProducers(singleton(topicPartition), 
options)
+                result = admin.describeProducers(Set.of(topicPartition), 
options)
                     .partitionResult(topicPartition)
                     .get();
             } catch (ExecutionException e) {
@@ -418,7 +415,7 @@ public abstract class TransactionsCommand {
 
             final TransactionDescription result;
             try {
-                result = admin.describeTransactions(singleton(transactionalId))
+                result = admin.describeTransactions(Set.of(transactionalId))
                     .description(transactionalId)
                     .get();
             } catch (ExecutionException e) {
@@ -451,7 +448,7 @@ public abstract class TransactionsCommand {
                 
result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(","))
             );
 
-            ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out);
+            ToolsUtils.prettyPrintTable(HEADERS, List.of(row), out);
         }
     }
 
@@ -615,7 +612,7 @@ public abstract class TransactionsCommand {
             );
 
             if (candidates.isEmpty()) {
-                printHangingTransactions(Collections.emptyList(), out);
+                printHangingTransactions(List.of(), out);
             } else {
                 Map<Long, List<OpenTransaction>> openTransactionsByProducerId 
= groupByProducerId(candidates);
 
@@ -649,9 +646,9 @@ public abstract class TransactionsCommand {
 
             if (topic.isPresent()) {
                 if (partition.isPresent()) {
-                    return Collections.singletonList(new 
TopicPartition(topic.get(), partition.get()));
+                    return List.of(new TopicPartition(topic.get(), 
partition.get()));
                 } else {
-                    topics = Collections.singletonList(topic.get());
+                    topics = List.of(topic.get());
                 }
             } else {
                 topics = listTopics(admin);
@@ -752,7 +749,7 @@ public abstract class TransactionsCommand {
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to describe " + 
transactionalIds.size()
                     + " transactions", e.getCause());
-                return Collections.emptyMap();
+                return Map.of();
             }
         }
 
@@ -778,7 +775,7 @@ public abstract class TransactionsCommand {
                 return new 
ArrayList<>(admin.listTopics(listOptions).names().get());
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to list topics", e.getCause());
-                return Collections.emptyList();
+                return List.of();
             }
         }
 
@@ -788,14 +785,14 @@ public abstract class TransactionsCommand {
             List<String> topics
         ) throws Exception {
             List<TopicPartition> topicPartitions = new ArrayList<>();
-            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch ->
                 findTopicPartitions(
                     admin,
                     brokerId,
                     batch,
                     topicPartitions
-                );
-            });
+                )
+            );
             return topicPartitions;
         }
 
@@ -807,13 +804,13 @@ public abstract class TransactionsCommand {
         ) throws Exception {
             try {
                 Map<String, TopicDescription> topicDescriptions = 
admin.describeTopics(topics).allTopicNames().get();
-                topicDescriptions.forEach((topic, description) -> {
+                topicDescriptions.forEach((topic, description) ->
                     description.partitions().forEach(partitionInfo -> {
                         if (brokerId.isEmpty() || hasReplica(brokerId.get(), 
partitionInfo)) {
                             topicPartitions.add(new TopicPartition(topic, 
partitionInfo.partition()));
                         }
-                    });
-                });
+                    })
+                );
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to describe " + topics.size() + " 
topics", e.getCause());
             }
@@ -838,15 +835,15 @@ public abstract class TransactionsCommand {
 
             List<OpenTransaction> candidateTransactions = new ArrayList<>();
 
-            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch ->
                 collectCandidateOpenTransactions(
                     admin,
                     brokerId,
                     maxTransactionTimeoutMs,
                     batch,
                     candidateTransactions
-                );
-            });
+                )
+            );
 
             return candidateTransactions;
         }
@@ -880,7 +877,7 @@ public abstract class TransactionsCommand {
 
                 long currentTimeMs = time.milliseconds();
 
-                producersByPartition.forEach((topicPartition, producersStates) 
-> {
+                producersByPartition.forEach((topicPartition, producersStates) 
->
                     producersStates.activeProducers().forEach(activeProducer 
-> {
                         if 
(activeProducer.currentTransactionStartOffset().isPresent()) {
                             long transactionDurationMs = currentTimeMs - 
activeProducer.lastTimestamp();
@@ -891,8 +888,8 @@ public abstract class TransactionsCommand {
                                 ));
                             }
                         }
-                    });
-                });
+                    })
+                );
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to describe producers for " + 
topicPartitions.size() +
                     " partitions on broker " + brokerId, e.getCause());
@@ -928,7 +925,7 @@ public abstract class TransactionsCommand {
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to list transactions for " + 
producerIds.size() +
                     " producers", e.getCause());
-                return Collections.emptyMap();
+                return Map.of();
             }
         }
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 0da56f1340b..1fa1aed8c4f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -58,7 +58,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -231,7 +230,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
     public void run() {
         try {
             printJson(new StartupComplete());
-            consumer.subscribe(Collections.singletonList(topic), this);
+            consumer.subscribe(List.of(topic), this);
 
             while (!isFinished()) {
                 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
@@ -623,7 +622,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
 
         boolean useAutoCommit = res.getBoolean("useAutoCommit");
         String configFile = res.getString("consumer.config");
-        String brokerHostandPort = res.getString("bootstrapServer");
+        String brokerHostAndPort = res.getString("bootstrapServer");
 
         Properties consumerProps = new Properties();
         if (configFile != null) {
@@ -664,7 +663,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
             consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
groupInstanceId);
         }
 
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostandPort);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerHostAndPort);
 
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
res.getString("resetPolicy"));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
index 496c41412cc..d83c1e44244 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
@@ -61,7 +61,6 @@ import java.io.PrintStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -421,7 +420,7 @@ public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommit
                 printJson(new 
OffsetResetStrategySet(offsetResetStrategy.type().toString()));
             }
 
-            consumer.subscribe(Collections.singleton(this.topic));
+            consumer.subscribe(Set.of(this.topic));
             consumer.setAcknowledgementCommitCallback(this);
             while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) {
                 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(5000));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index f7c1402f25e..74a928b9727 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -37,6 +37,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
@@ -160,7 +161,7 @@ public class ConsoleConsumer {
                 if (opts.partitionArg().isPresent()) {
                     seek(topic.get(), opts.partitionArg().getAsInt(), 
opts.offsetArg());
                 } else {
-                    consumer.subscribe(Collections.singletonList(topic.get()));
+                    consumer.subscribe(List.of(topic.get()));
                 }
             } else {
                 opts.includedTopicsArg().ifPresent(topics -> 
consumer.subscribe(Pattern.compile(topics)));
@@ -169,11 +170,11 @@ public class ConsoleConsumer {
 
         private void seek(String topic, int partitionId, long offset) {
             TopicPartition topicPartition = new TopicPartition(topic, 
partitionId);
-            consumer.assign(Collections.singletonList(topicPartition));
+            consumer.assign(List.of(topicPartition));
             if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
-                
consumer.seekToBeginning(Collections.singletonList(topicPartition));
+                consumer.seekToBeginning(List.of(topicPartition));
             } else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) {
-                consumer.seekToEnd(Collections.singletonList(topicPartition));
+                consumer.seekToEnd(List.of(topicPartition));
             } else {
                 consumer.seek(topicPartition, offset);
             }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index 5c6d26f433e..cf4daa0c636 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -26,7 +26,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -186,9 +185,9 @@ public final class ConsoleConsumerOptions extends 
CommandDefaultOptions {
     }
 
     private void checkRequiredArgs() {
-        List<Optional<String>> topicOrFilterArgs = new 
ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg()));
-        topicOrFilterArgs.removeIf(arg -> arg.isEmpty());
-        // user need to specify value for either --topic or --include options)
+        List<Optional<String>> topicOrFilterArgs = new 
ArrayList<>(List.of(topicArg(), includedTopicsArg()));
+        topicOrFilterArgs.removeIf(Optional::isEmpty);
+        // user need to specify value for either --topic or --include options
         if (topicOrFilterArgs.size() != 1) {
             CommandLineUtils.printUsageAndExit(parser, "Exactly one of 
--include/--topic is required. ");
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 904f2333f2f..2505266abf7 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -36,6 +36,7 @@ import java.io.PrintStream;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -166,7 +167,7 @@ public class ConsoleShareConsumer {
             this.consumer = consumer;
             this.timeoutMs = timeoutMs;
 
-            consumer.subscribe(Collections.singletonList(topic));
+            consumer.subscribe(List.of(topic));
         }
 
         ConsumerRecord<byte[], byte[]> receive() {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 6a0d1ffd224..2cfffb9fe78 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -58,7 +58,6 @@ import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -121,7 +120,7 @@ public class ConsumerGroupCommand {
             return;
         }
 
-        try (ConsumerGroupService consumerGroupService = new 
ConsumerGroupService(opts, Collections.emptyMap())) {
+        try (ConsumerGroupService consumerGroupService = new 
ConsumerGroupService(opts, Map.of())) {
             if (opts.options.has(opts.listOpt))
                 consumerGroupService.listGroups();
             else if (opts.options.has(opts.describeOpt))
@@ -242,14 +241,14 @@ public class ConsumerGroupCommand {
         private Set<GroupState> stateValues() {
             String stateValue = opts.options.valueOf(opts.stateOpt);
             return (stateValue == null || stateValue.isEmpty())
-                ? Collections.emptySet()
+                ? Set.of()
                 : groupStatesFromString(stateValue);
         }
 
         private Set<GroupType> typeValues() {
             String typeValue = opts.options.valueOf(opts.typeOpt);
             return (typeValue == null || typeValue.isEmpty())
-                ? Collections.emptySet()
+                ? Set.of()
                 : consumerGroupTypesFromString(typeValue);
         }
 
@@ -585,7 +584,7 @@ public class ConsumerGroupCommand {
             Optional<String> clientIdOpt
         ) {
             if (topicPartitions.isEmpty()) {
-                return Collections.singleton(
+                return Set.of(
                     new PartitionAssignmentState(group, coordinator, 
Optional.empty(), Optional.empty(), Optional.empty(),
                         getLag(Optional.empty(), Optional.empty()), 
consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
                 );
@@ -684,7 +683,7 @@ public class ConsumerGroupCommand {
                             break;
                         default:
                             printError("Assignments can only be reset if the 
group '" + groupId + "' is inactive, but the current state is " + state + ".", 
Optional.empty());
-                            result.put(groupId, Collections.emptyMap());
+                            result.put(groupId, Map.of());
                     }
                 } catch (InterruptedException ie) {
                     throw new RuntimeException(ie);
@@ -855,7 +854,7 @@ public class ConsumerGroupCommand {
          * Returns the state of the specified consumer group and partition 
assignment states
          */
         Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String 
groupId) throws Exception {
-            return 
collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, 
new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
+            return 
collectGroupsOffsets(List.of(groupId)).getOrDefault(groupId, new 
SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
         }
 
         /**
@@ -899,7 +898,7 @@ public class ConsumerGroupCommand {
                         Optional.of(MISSING_COLUMN_VALUE),
                         Optional.of(MISSING_COLUMN_VALUE),
                         Optional.of(MISSING_COLUMN_VALUE))
-                    : Collections.emptyList();
+                    : List.of();
 
                 rowsWithConsumer.addAll(rowsWithoutConsumer);
 
@@ -910,7 +909,7 @@ public class ConsumerGroupCommand {
         }
 
         Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String 
groupId) throws Exception {
-            return 
collectGroupsMembers(Collections.singleton(groupId)).get(groupId);
+            return collectGroupsMembers(Set.of(groupId)).get(groupId);
         }
 
         TreeMap<String, Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>>> 
collectGroupsMembers(Collection<String> groupIds) throws Exception {
@@ -928,7 +927,7 @@ public class ConsumerGroupCommand {
                         consumer.groupInstanceId().orElse(""),
                         consumer.assignment().topicPartitions().size(),
                         
consumer.assignment().topicPartitions().stream().toList(),
-                        consumer.targetAssignment().map(a -> 
a.topicPartitions().stream().toList()).orElse(Collections.emptyList()),
+                        consumer.targetAssignment().map(a -> 
a.topicPartitions().stream().toList()).orElse(List.of()),
                         consumer.memberEpoch(),
                         consumerGroup.targetAssignmentEpoch(),
                         consumer.upgraded()
@@ -939,7 +938,7 @@ public class ConsumerGroupCommand {
         }
 
         GroupInformation collectGroupState(String groupId) throws Exception {
-            return 
collectGroupsState(Collections.singleton(groupId)).get(groupId);
+            return collectGroupsState(Set.of(groupId)).get(groupId);
         }
 
         TreeMap<String, GroupInformation> 
collectGroupsState(Collection<String> groupIds) throws Exception {
@@ -986,14 +985,14 @@ public class ConsumerGroupCommand {
                 if (!opts.options.has(opts.resetFromFileOpt))
                     CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
 
-                return Collections.emptyList();
+                return List.of();
             }
         }
 
         private Map<TopicPartition, OffsetAndMetadata> 
getCommittedOffsets(String groupId) {
             try {
                 return adminClient.listConsumerGroupOffsets(
-                    Collections.singletonMap(groupId, new 
ListConsumerGroupOffsetsSpec()),
+                    Map.of(groupId, new ListConsumerGroupOffsetsSpec()),
                     withTimeoutMs(new ListConsumerGroupOffsetsOptions())
                 ).partitionsToOffsetAndMetadata(groupId).get();
             } catch (InterruptedException | ExecutionException e) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 553ea3acb8a..30ba137cd8d 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -203,11 +201,11 @@ public class ConsumerGroupCommandOptions extends 
CommandDefaultOptions {
             .describedAs("regex")
             .ofType(String.class);
 
-        allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
-        allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt, resetOffsetsOpt));
-        allResetOffsetScenarioOpts = new 
HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
-            resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, 
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
-        allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, 
topicOpt));
+        allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+        allConsumerGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, 
resetOffsetsOpt);
+        allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
+            resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, 
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
+        allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
 
         options = parser.parse(args);
     }
@@ -224,7 +222,7 @@ public class ConsumerGroupCommandOptions extends 
CommandDefaultOptions {
             if (!options.has(groupOpt) && !options.has(allGroupsOpt))
                 CommandLineUtils.printUsageAndExit(parser,
             "Option " + describeOpt + " takes one of these options: " + 
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
-            List<OptionSpec<?>> mutuallyExclusiveOpts = 
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+            List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, 
offsetsOpt, stateOpt);
             if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 
1 : 0).sum() > 1) {
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 0e5fc6167ee..8996f92b84e 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -387,7 +387,7 @@ public class ShareGroupCommand {
         TreeMap<String, ShareGroupDescription> 
collectGroupsDescription(Collection<String> groupIds) throws 
ExecutionException, InterruptedException {
             Map<String, ShareGroupDescription> shareGroups = 
describeShareGroups(groupIds);
             TreeMap<String, ShareGroupDescription> res = new TreeMap<>();
-            shareGroups.forEach(res::put);
+            res.putAll(shareGroups);
             return res;
         }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index be99d2946a7..9a96cad00ed 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -144,10 +142,10 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
         verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
             .availableIf(describeOpt);
 
-        allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
-        allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt, resetOffsetsOpt));
-        allResetOffsetScenarioOpts = new 
HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, 
resetToLatestOpt));
-        allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, 
topicOpt));
+        allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+        allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, 
resetOffsetsOpt);
+        allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt, 
resetToEarliestOpt, resetToLatestOpt);
+        allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
 
         options = parser.parse(args);
     }
@@ -162,7 +160,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
             if (!options.has(groupOpt) && !options.has(allGroupsOpt))
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + describeOpt + " takes one of these options: " 
+ 
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
-            List<OptionSpec<?>> mutuallyExclusiveOpts = 
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+            List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, 
offsetsOpt, stateOpt);
             if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 
1 : 0).sum() > 1) {
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 329b202b4a8..334f0738ca3 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -226,7 +226,7 @@ public class ReassignPartitionsCommand {
         if (!partsOngoing && !movesOngoing && !preserveThrottles) {
             // If the partition assignments and replica assignments are done, 
clear any throttles
             // that were set.  We have to clear all throttles, because we 
don't have enough
-            // information to know all of the source brokers that might have 
been involved in the
+            // information to know all the source brokers that might have been 
involved in the
             // previous reassignments.
             clearAllThrottles(adminClient, targetParts);
         }
@@ -849,7 +849,7 @@ public class ReassignPartitionsCommand {
                             .map(Object::toString)
                             .collect(Collectors.joining(","))));
             } else {
-                // If a replica has been moved to a new host and we also 
specified a particular
+                // If a replica has been moved to a new host, and we also 
specified a particular
                 // log directory, we will have to keep retrying the 
alterReplicaLogDirs
                 // call.  It can't take effect until the replica is moved to 
that host.
                 time.sleep(100);
@@ -1342,21 +1342,18 @@ public class ReassignPartitionsCommand {
     }
 
     private static List<String> parseTopicsData(int version, JsonValue js) 
throws JsonMappingException {
-        switch (version) {
-            case 1:
-                List<String> results = new ArrayList<>();
-                Optional<JsonValue> partitionsSeq = 
js.asJsonObject().get("topics");
-                if (partitionsSeq.isPresent()) {
-                    Iterator<JsonValue> iter = 
partitionsSeq.get().asJsonArray().iterator();
-                    while (iter.hasNext()) {
-                        
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
-                    }
+        if (version == 1) {
+            List<String> results = new ArrayList<>();
+            Optional<JsonValue> partitionsSeq = 
js.asJsonObject().get("topics");
+            if (partitionsSeq.isPresent()) {
+                Iterator<JsonValue> iter = 
partitionsSeq.get().asJsonArray().iterator();
+                while (iter.hasNext()) {
+                    
results.add(iter.next().asJsonObject().apply("topic").to(STRING));
                 }
-                return results;
-
-            default:
-                throw new AdminOperationException("Not supported version field 
value " + version);
+            }
+            return results;
         }
+        throw new AdminOperationException("Not supported version field value " 
+ version);
     }
 
     private static Entry<List<Entry<TopicPartition, List<Integer>>>, 
Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
@@ -1376,46 +1373,43 @@ public class ReassignPartitionsCommand {
     private static Entry<List<Entry<TopicPartition, List<Integer>>>, 
Map<TopicPartitionReplica, String>> parsePartitionReassignmentData(
         int version, JsonValue jsonData
     ) throws JsonMappingException {
-        switch (version) {
-            case 1:
-                List<Entry<TopicPartition, List<Integer>>> partitionAssignment 
= new ArrayList<>();
-                Map<TopicPartitionReplica, String> replicaAssignment = new 
HashMap<>();
-
-                Optional<JsonValue> partitionsSeq = 
jsonData.asJsonObject().get("partitions");
-                if (partitionsSeq.isPresent()) {
-                    Iterator<JsonValue> iter = 
partitionsSeq.get().asJsonArray().iterator();
-                    while (iter.hasNext()) {
-                        JsonObject partitionFields = 
iter.next().asJsonObject();
-                        String topic = 
partitionFields.apply("topic").to(STRING);
-                        int partition = 
partitionFields.apply("partition").to(INT);
-                        List<Integer> newReplicas = 
partitionFields.apply("replicas").to(INT_LIST);
-                        Optional<JsonValue> logDirsOpts = 
partitionFields.get("log_dirs");
-                        List<String> newLogDirs;
-                        if (logDirsOpts.isPresent())
-                            newLogDirs = logDirsOpts.get().to(STRING_LIST);
-                        else
-                            newLogDirs = newReplicas.stream().map(r -> 
ANY_LOG_DIR).collect(Collectors.toList());
-                        if (newReplicas.size() != newLogDirs.size())
-                            throw new AdminCommandFailedException("Size of 
replicas list " + newReplicas + " is different from " +
-                                "size of log dirs list " + newLogDirs + " for 
partition " + new TopicPartition(topic, partition));
-                        partitionAssignment.add(Map.entry(new 
TopicPartition(topic, partition), newReplicas));
-                        for (int i = 0; i < newLogDirs.size(); i++) {
-                            Integer replica = newReplicas.get(i);
-                            String logDir = newLogDirs.get(i);
-
-                            if (logDir.equals(ANY_LOG_DIR))
-                                continue;
-
-                            replicaAssignment.put(new 
TopicPartitionReplica(topic, partition, replica), logDir);
-                        }
+        if (version == 1) {
+            List<Entry<TopicPartition, List<Integer>>> partitionAssignment = 
new ArrayList<>();
+            Map<TopicPartitionReplica, String> replicaAssignment = new 
HashMap<>();
+
+            Optional<JsonValue> partitionsSeq = 
jsonData.asJsonObject().get("partitions");
+            if (partitionsSeq.isPresent()) {
+                Iterator<JsonValue> iter = 
partitionsSeq.get().asJsonArray().iterator();
+                while (iter.hasNext()) {
+                    JsonObject partitionFields = iter.next().asJsonObject();
+                    String topic = partitionFields.apply("topic").to(STRING);
+                    int partition = partitionFields.apply("partition").to(INT);
+                    List<Integer> newReplicas = 
partitionFields.apply("replicas").to(INT_LIST);
+                    Optional<JsonValue> logDirsOpts = 
partitionFields.get("log_dirs");
+                    List<String> newLogDirs;
+                    if (logDirsOpts.isPresent())
+                        newLogDirs = logDirsOpts.get().to(STRING_LIST);
+                    else
+                        newLogDirs = newReplicas.stream().map(r -> 
ANY_LOG_DIR).collect(Collectors.toList());
+                    if (newReplicas.size() != newLogDirs.size())
+                        throw new AdminCommandFailedException("Size of 
replicas list " + newReplicas + " is different from " +
+                            "size of log dirs list " + newLogDirs + " for 
partition " + new TopicPartition(topic, partition));
+                    partitionAssignment.add(Map.entry(new 
TopicPartition(topic, partition), newReplicas));
+                    for (int i = 0; i < newLogDirs.size(); i++) {
+                        Integer replica = newReplicas.get(i);
+                        String logDir = newLogDirs.get(i);
+
+                        if (logDir.equals(ANY_LOG_DIR))
+                            continue;
+
+                        replicaAssignment.put(new TopicPartitionReplica(topic, 
partition, replica), logDir);
                     }
                 }
+            }
 
-                return Map.entry(partitionAssignment, replicaAssignment);
-
-            default:
-                throw new AdminOperationException("Not supported version field 
value " + version);
+            return Map.entry(partitionAssignment, replicaAssignment);
         }
+        throw new AdminOperationException("Not supported version field value " 
+ version);
     }
 
     static ReassignPartitionsCommandOptions validateAndParseArgs(String[] 
args) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
index 4812f1f19e9..9dbde0fd1af 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
@@ -20,7 +20,6 @@ package org.apache.kafka.tools.reassign;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionReplica;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
@@ -34,7 +33,7 @@ public final class VerifyAssignmentResult {
     public final boolean movesOngoing;
 
     public VerifyAssignmentResult(Map<TopicPartition, 
PartitionReassignmentState> partStates) {
-        this(partStates, false, Collections.emptyMap(), false);
+        this(partStates, false, Map.of(), false);
     }
 
     /**
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 7ac61e9df9e..0f68bf82900 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -61,7 +61,6 @@ import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -457,7 +456,7 @@ public class StreamsGroupCommand {
                 boolean allPresent = 
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
                 if (!allPresent) {
                     printError("One or more topics are not part of the group 
'" + groupId + "'.", Optional.empty());
-                    return Collections.emptyList();
+                    return List.of();
                 }
                 return topicPartitions;
             } catch (InterruptedException | ExecutionException e) {
@@ -508,7 +507,7 @@ public class StreamsGroupCommand {
                                 break;
                             default:
                                 printError("Assignments can only be reset if 
the group '" + groupId + "' is inactive, but the current state is " + state + 
".", Optional.empty());
-                                result.put(groupId, Collections.emptyMap());
+                                result.put(groupId, Map.of());
                         }
                     } catch (InterruptedException ie) {
                         throw new RuntimeException(ie);
@@ -889,7 +888,7 @@ public class StreamsGroupCommand {
                 if (!opts.options.has(opts.resetFromFileOpt))
                     CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
 
-                return Collections.emptyList();
+                return List.of();
             }
         }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index e74104d27ff..6ce387d3dbd 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -22,8 +22,6 @@ import org.apache.kafka.server.util.CommandLineUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -200,12 +198,12 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         exportOpt = parser.accepts("export", EXPORT_DOC);
         options = parser.parse(args);
 
-        allResetOffsetScenarioOpts = new 
HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
-            resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, 
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
-        allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
-        allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt));
-        allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt, 
allInputTopicsOpt));
-        allDeleteInternalGroupsOpts = new 
HashSet<>(Arrays.asList(resetOffsetsOpt, deleteOpt));
+        allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt,
+            resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, 
resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt);
+        allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+        allStreamsGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt);
+        allDeleteOffsetsOpts = Set.of(inputTopicOpt, allInputTopicsOpt);
+        allDeleteInternalGroupsOpts = Set.of(resetOffsetsOpt, deleteOpt);
     }
 
     @SuppressWarnings("NPathComplexity")
@@ -256,7 +254,7 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
             CommandLineUtils.printUsageAndExit(parser,
                 "Option " + describeOpt + " takes one of these options: " + 
allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
-        List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, 
offsetsOpt, stateOpt);
+        List<OptionSpec<?>> mutuallyExclusiveOpts = List.of(membersOpt, 
offsetsOpt, stateOpt);
         if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 
0).sum() > 1) {
             CommandLineUtils.printUsageAndExit(parser,
                 "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));

Reply via email to