This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0da9cacffab MINOR: Cleanups in Tools Module (3/n) (#20332)
0da9cacffab is described below

commit 0da9cacffaba276e1bc6c1301aed449aaff788b9
Author: Sanskar Jhajharia <sjhajha...@confluent.io>
AuthorDate: Tue Aug 19 19:54:13 2025 +0530

    MINOR: Cleanups in Tools Module (3/n) (#20332)
    
    This PR aims at cleaning up the tools module further by getting rid of
    some extra code which can be replaced by `record`
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../java/org/apache/kafka/tools/AclCommand.java    | 168 ++++++++++-----------
 .../org/apache/kafka/tools/ConnectPluginPath.java  |  35 +----
 .../apache/kafka/tools/MetadataQuorumCommand.java  |   3 +-
 .../apache/kafka/tools/OAuthCompatibilityTool.java |   9 +-
 .../java/org/apache/kafka/tools/OffsetsUtils.java  |  11 +-
 .../kafka/tools/PushHttpMetricsReporter.java       |  85 ++---------
 .../kafka/tools/ReplicaVerificationTool.java       |  13 +-
 .../apache/kafka/tools/TransactionsCommand.java    |  31 ++--
 .../tools/consumer/group/ConsumerGroupCommand.java |  96 ++++++------
 .../tools/consumer/group/GroupInformation.java     |  28 +---
 .../consumer/group/MemberAssignmentState.java      |  42 +-----
 .../consumer/group/PartitionAssignmentState.java   |  42 +-----
 .../tools/consumer/group/ShareGroupCommand.java    |  22 +--
 .../kafka/tools/reassign/ActiveMoveState.java      |  37 +----
 .../kafka/tools/reassign/CancelledMoveState.java   |  32 +---
 .../kafka/tools/reassign/CompletedMoveState.java   |  27 +---
 .../tools/reassign/MissingLogDirMoveState.java     |  27 +---
 .../tools/reassign/MissingReplicaMoveState.java    |  27 +---
 .../tools/reassign/ReassignPartitionsCommand.java  |  10 +-
 .../apache/kafka/tools/ConnectPluginPathTest.java  |  27 +---
 .../consumer/group/ConsumerGroupServiceTest.java   |   8 +-
 .../consumer/group/DeleteConsumerGroupsTest.java   |   2 +-
 .../consumer/group/DescribeConsumerGroupTest.java  |  92 +++++------
 .../group/ResetConsumerGroupOffsetTest.java        |   4 +-
 .../reassign/ReassignPartitionsCommandTest.java    |  11 +-
 25 files changed, 242 insertions(+), 647 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
index 791397b2405..fd62e8f3b56 100644
--- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -73,14 +73,13 @@ public class AclCommand {
 
     public static void main(String[] args) {
         AclCommandOptions opts = new AclCommandOptions(args);
-        AdminClientService aclCommandService = new AdminClientService(opts);
         try (Admin admin = Admin.create(adminConfigs(opts))) {
             if (opts.options.has(opts.addOpt)) {
-                aclCommandService.addAcls(admin);
+                addAcls(admin, opts);
             } else if (opts.options.has(opts.removeOpt)) {
-                aclCommandService.removeAcls(admin);
+                removeAcls(admin, opts);
             } else if (opts.options.has(opts.listOpt)) {
-                aclCommandService.listAcls(admin);
+                listAcls(admin, opts);
             }
         } catch (Throwable e) {
             System.out.println("Error while executing ACL command: " + 
e.getMessage());
@@ -102,106 +101,97 @@ public class AclCommand {
         return props;
     }
 
-    private static class AdminClientService {
-
-        private final AclCommandOptions opts;
-
-        AdminClientService(AclCommandOptions opts) {
-            this.opts = opts;
-        }
-
-        void addAcls(Admin admin) throws ExecutionException, 
InterruptedException {
-            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = 
getResourceToAcls(opts);
-            for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : 
resourceToAcl.entrySet()) {
-                ResourcePattern resource = entry.getKey();
-                Set<AccessControlEntry> acls = entry.getValue();
-                System.out.println("Adding ACLs for resource `" + resource + 
"`: " + NL + " " + acls.stream().map(a -> "\t" + 
a).collect(Collectors.joining(NL)) + NL);
-                Collection<AclBinding> aclBindings = acls.stream().map(acl -> 
new AclBinding(resource, acl)).collect(Collectors.toList());
-                admin.createAcls(aclBindings).all().get();
-            }
+    private static void addAcls(Admin admin, AclCommandOptions opts) throws 
ExecutionException, InterruptedException {
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = 
getResourceToAcls(opts);
+        for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : 
resourceToAcl.entrySet()) {
+            ResourcePattern resource = entry.getKey();
+            Set<AccessControlEntry> acls = entry.getValue();
+            System.out.println("Adding ACLs for resource `" + resource + "`: " 
+ NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + 
NL);
+            Collection<AclBinding> aclBindings = acls.stream().map(acl -> new 
AclBinding(resource, acl)).collect(Collectors.toList());
+            admin.createAcls(aclBindings).all().get();
         }
+    }
 
-        void removeAcls(Admin admin) throws ExecutionException, 
InterruptedException {
-            Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = 
getResourceFilterToAcls(opts);
-            for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> 
entry : filterToAcl.entrySet()) {
-                ResourcePatternFilter filter = entry.getKey();
-                Set<AccessControlEntry> acls = entry.getValue();
-                if (acls.isEmpty()) {
-                    if (confirmAction(opts, "Are you sure you want to delete 
all ACLs for resource filter `" + filter + "`? (y/n)")) {
-                        removeAcls(admin, acls, filter);
-                    }
-                } else {
-                    String msg = "Are you sure you want to remove ACLs: " + NL 
+
-                            " " + acls.stream().map(a -> "\t" + 
a).collect(Collectors.joining(NL)) + NL +
-                            " from resource filter `" + filter + "`? (y/n)";
-                    if (confirmAction(opts, msg)) {
-                        removeAcls(admin, acls, filter);
-                    }
+    private static void removeAcls(Admin admin, AclCommandOptions opts) throws 
ExecutionException, InterruptedException {
+        Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = 
getResourceFilterToAcls(opts);
+        for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : 
filterToAcl.entrySet()) {
+            ResourcePatternFilter filter = entry.getKey();
+            Set<AccessControlEntry> acls = entry.getValue();
+            if (acls.isEmpty()) {
+                if (confirmAction(opts, "Are you sure you want to delete all 
ACLs for resource filter `" + filter + "`? (y/n)")) {
+                    removeAcls(admin, acls, filter);
+                }
+            } else {
+                String msg = "Are you sure you want to remove ACLs: " + NL +
+                        " " + acls.stream().map(a -> "\t" + 
a).collect(Collectors.joining(NL)) + NL +
+                        " from resource filter `" + filter + "`? (y/n)";
+                if (confirmAction(opts, msg)) {
+                    removeAcls(admin, acls, filter);
                 }
             }
         }
+    }
 
-        private void listAcls(Admin admin) throws ExecutionException, 
InterruptedException {
-            Set<ResourcePatternFilter> filters = getResourceFilter(opts, 
false);
-            Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, 
opts.listPrincipalsOpt);
-            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = 
getAcls(admin, filters);
+    private static void listAcls(Admin admin, AclCommandOptions opts) throws 
ExecutionException, InterruptedException {
+        Set<ResourcePatternFilter> filters = getResourceFilter(opts, false);
+        Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, 
opts.listPrincipalsOpt);
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = 
getAcls(admin, filters);
 
-            if (listPrincipals.isEmpty()) {
-                printResourceAcls(resourceToAcls);
-            } else {
-                listPrincipals.forEach(principal -> {
-                    System.out.println("ACLs for principal `" + principal + 
"`");
-                    Map<ResourcePattern, Set<AccessControlEntry>> 
filteredResourceToAcls = resourceToAcls.entrySet().stream()
-                            .map(entry -> {
-                                ResourcePattern resource = entry.getKey();
-                                Set<AccessControlEntry> acls = 
entry.getValue().stream()
-                                        .filter(acl -> 
principal.toString().equals(acl.principal()))
-                                        .collect(Collectors.toSet());
-                                return new AbstractMap.SimpleEntry<>(resource, 
acls);
-                            })
-                            .filter(entry -> !entry.getValue().isEmpty())
-                            .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-                    printResourceAcls(filteredResourceToAcls);
-                });
-            }
+        if (listPrincipals.isEmpty()) {
+            printResourceAcls(resourceToAcls);
+        } else {
+            listPrincipals.forEach(principal -> {
+                System.out.println("ACLs for principal `" + principal + "`");
+                Map<ResourcePattern, Set<AccessControlEntry>> 
filteredResourceToAcls = resourceToAcls.entrySet().stream()
+                        .map(entry -> {
+                            ResourcePattern resource = entry.getKey();
+                            Set<AccessControlEntry> acls = 
entry.getValue().stream()
+                                    .filter(acl -> 
principal.toString().equals(acl.principal()))
+                                    .collect(Collectors.toSet());
+                            return new AbstractMap.SimpleEntry<>(resource, 
acls);
+                        })
+                        .filter(entry -> !entry.getValue().isEmpty())
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                printResourceAcls(filteredResourceToAcls);
+            });
         }
+    }
 
-        private static void printResourceAcls(Map<ResourcePattern, 
Set<AccessControlEntry>> resourceToAcls) {
-            resourceToAcls.forEach((resource, acls) ->
-                System.out.println("Current ACLs for resource `" + resource + 
"`:" + NL +
-                        acls.stream().map(acl -> "\t" + 
acl).collect(Collectors.joining(NL)) + NL)
-            );
-        }
+    private static void printResourceAcls(Map<ResourcePattern, 
Set<AccessControlEntry>> resourceToAcls) {
+        resourceToAcls.forEach((resource, acls) ->
+            System.out.println("Current ACLs for resource `" + resource + "`:" 
+ NL +
+                    acls.stream().map(acl -> "\t" + 
acl).collect(Collectors.joining(NL)) + NL)
+        );
+    }
 
-        private static void removeAcls(Admin adminClient, 
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws 
ExecutionException, InterruptedException {
-            if (acls.isEmpty()) {
-                adminClient.deleteAcls(List.of(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY))).all().get();
-            } else {
-                List<AclBindingFilter> aclBindingFilters = 
acls.stream().map(acl -> new AclBindingFilter(filter, 
acl.toFilter())).collect(Collectors.toList());
-                adminClient.deleteAcls(aclBindingFilters).all().get();
-            }
+    private static void removeAcls(Admin adminClient, Set<AccessControlEntry> 
acls, ResourcePatternFilter filter) throws ExecutionException, 
InterruptedException {
+        if (acls.isEmpty()) {
+            adminClient.deleteAcls(List.of(new AclBindingFilter(filter, 
AccessControlEntryFilter.ANY))).all().get();
+        } else {
+            List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl 
-> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
+            adminClient.deleteAcls(aclBindingFilters).all().get();
         }
+    }
 
-        private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin 
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, 
InterruptedException {
-            Collection<AclBinding> aclBindings;
-            if (filters.isEmpty()) {
-                aclBindings = 
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
-            } else {
-                aclBindings = new ArrayList<>();
-                for (ResourcePatternFilter filter : filters) {
-                    aclBindings.addAll(adminClient.describeAcls(new 
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
-                }
+    private static Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin 
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, 
InterruptedException {
+        Collection<AclBinding> aclBindings;
+        if (filters.isEmpty()) {
+            aclBindings = 
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
+        } else {
+            aclBindings = new ArrayList<>();
+            for (ResourcePatternFilter filter : filters) {
+                aclBindings.addAll(adminClient.describeAcls(new 
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
             }
+        }
 
-            Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new 
HashMap<>();
-            for (AclBinding aclBinding : aclBindings) {
-                ResourcePattern resource = aclBinding.pattern();
-                Set<AccessControlEntry> acls = 
resourceToAcls.getOrDefault(resource, new HashSet<>());
-                acls.add(aclBinding.entry());
-                resourceToAcls.put(resource, acls);
-            }
-            return resourceToAcls;
+        Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new 
HashMap<>();
+        for (AclBinding aclBinding : aclBindings) {
+            ResourcePattern resource = aclBinding.pattern();
+            Set<AccessControlEntry> acls = 
resourceToAcls.getOrDefault(resource, new HashSet<>());
+            acls.add(aclBinding.entry());
+            resourceToAcls.put(resource, acls);
         }
+        return resourceToAcls;
     }
 
     private static Map<ResourcePattern, Set<AccessControlEntry>> 
getResourceToAcls(AclCommandOptions opts) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java 
b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
index c86638e846a..a3da1d3a9c6 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
@@ -196,22 +196,8 @@ public class ConnectPluginPath {
         LIST, SYNC_MANIFESTS
     }
 
-    private static class Config {
-        private final Command command;
-        private final Set<Path> locations;
-        private final boolean dryRun;
-        private final boolean keepNotFound;
-        private final PrintStream out;
-        private final PrintStream err;
-
-        private Config(Command command, Set<Path> locations, boolean dryRun, 
boolean keepNotFound, PrintStream out, PrintStream err) {
-            this.command = command;
-            this.locations = locations;
-            this.dryRun = dryRun;
-            this.keepNotFound = keepNotFound;
-            this.out = out;
-            this.err = err;
-        }
+    private record Config(Command command, Set<Path> locations, boolean 
dryRun, boolean keepNotFound, PrintStream out,
+                          PrintStream err) {
 
         @Override
         public String toString() {
@@ -262,16 +248,9 @@ public class ConnectPluginPath {
      * <p>This is unique to the (source, class, type) tuple, and contains 
additional pre-computed information
      * that pertains to this specific plugin.
      */
-    private static class Row {
-        private final ManifestWorkspace.SourceWorkspace<?> workspace;
-        private final String className;
-        private final PluginType type;
-        private final String version;
-        private final List<String> aliases;
-        private final boolean loadable;
-        private final boolean hasManifest;
-
-        public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String 
className, PluginType type, String version, List<String> aliases, boolean 
loadable, boolean hasManifest) {
+    private record Row(ManifestWorkspace.SourceWorkspace<?> workspace, String 
className, PluginType type,
+                       String version, List<String> aliases, boolean loadable, 
boolean hasManifest) {
+        private Row(ManifestWorkspace.SourceWorkspace<?> workspace, String 
className, PluginType type, String version, List<String> aliases, boolean 
loadable, boolean hasManifest) {
             this.workspace = Objects.requireNonNull(workspace, "workspace must 
be non-null");
             this.className = Objects.requireNonNull(className, "className must 
be non-null");
             this.version = Objects.requireNonNull(version, "version must be 
non-null");
@@ -281,10 +260,6 @@ public class ConnectPluginPath {
             this.hasManifest = hasManifest;
         }
 
-        private boolean loadable() {
-            return loadable;
-        }
-
         private boolean compatible() {
             return loadable && hasManifest;
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index b65655a8c89..4abe443b82b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -65,7 +65,6 @@ import java.util.stream.Stream;
 
 import static java.lang.String.format;
 import static java.lang.String.valueOf;
-import static java.util.Arrays.asList;
 
 /**
  * A tool for describing quorum status
@@ -206,7 +205,7 @@ public class MetadataQuorumCommand {
         rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), 
"Observer", humanReadable));
 
         ToolsUtils.prettyPrintTable(
-            asList("NodeId", "DirectoryId", "LogEndOffset", "Lag", 
"LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
+            List.of("NodeId", "DirectoryId", "LogEndOffset", "Lag", 
"LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
             rows,
             System.out
         );
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java 
b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
index 40f3100054b..59dbb47daec 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
@@ -292,14 +292,7 @@ public class OAuthCompatibilityTool {
 
     }
 
-    private static class ConfigHandler {
-
-        private final Namespace namespace;
-
-
-        private ConfigHandler(Namespace namespace) {
-            this.namespace = namespace;
-        }
+    private record ConfigHandler(Namespace namespace) {
 
         private Map<String, ?> getConfigs() {
             Map<String, Object> m = new HashMap<>();
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index e37bf2804d7..269cd53875d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -500,16 +500,7 @@ public class OffsetsUtils {
 
     public interface LogOffsetResult { }
 
-    public static class LogOffset implements LogOffsetResult {
-        final long value;
-
-        public LogOffset(long value) {
-            this.value = value;
-        }
-
-        public long value() {
-            return value;
-        }
+    public record LogOffset(long value) implements LogOffsetResult {
     }
 
     public static class Unknown implements LogOffsetResult { }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java 
b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index 86ea19f0623..9b423b78947 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -240,86 +240,19 @@ public class PushHttpMetricsReporter implements 
MetricsReporter {
         }
     }
 
-    private static class MetricsReport {
-        private final MetricClientInfo client;
-        private final Collection<MetricValue> metrics;
-
-        MetricsReport(MetricClientInfo client, Collection<MetricValue> 
metrics) {
-            this.client = client;
-            this.metrics = metrics;
-        }
-
-        @JsonProperty
-        public MetricClientInfo client() {
-            return client;
-        }
-
-        @JsonProperty
-        public Collection<MetricValue> metrics() {
-            return metrics;
-        }
+    private record MetricsReport(@JsonProperty("client") MetricClientInfo 
client, 
+                                @JsonProperty("metrics") 
Collection<MetricValue> metrics) {
     }
 
-    private static class MetricClientInfo {
-        private final String host;
-        private final String clientId;
-        private final long time;
-
-        MetricClientInfo(String host, String clientId, long time) {
-            this.host = host;
-            this.clientId = clientId;
-            this.time = time;
-        }
-
-        @JsonProperty
-        public String host() {
-            return host;
-        }
-
-        @JsonProperty("client_id")
-        public String clientId() {
-            return clientId;
-        }
-
-        @JsonProperty
-        public long time() {
-            return time;
-        }
+    private record MetricClientInfo(@JsonProperty("host") String host, 
+                                   @JsonProperty("client_id") String clientId, 
+                                   @JsonProperty("time") long time) {
     }
 
-    private static class MetricValue {
-
-        private final String name;
-        private final String group;
-        private final Map<String, String> tags;
-        private final Object value;
-
-        MetricValue(String name, String group, Map<String, String> tags, 
Object value) {
-            this.name = name;
-            this.group = group;
-            this.tags = tags;
-            this.value = value;
-        }
-
-        @JsonProperty
-        public String name() {
-            return name;
-        }
-
-        @JsonProperty
-        public String group() {
-            return group;
-        }
-
-        @JsonProperty
-        public Map<String, String> tags() {
-            return tags;
-        }
-
-        @JsonProperty
-        public Object value() {
-            return value;
-        }
+    private record MetricValue(@JsonProperty("name") String name, 
+                              @JsonProperty("group") String group, 
+                              @JsonProperty("tags") Map<String, String> tags, 
+                              @JsonProperty("value") Object value) {
     }
 
     // The signature for getInt changed from returning int to Integer so to 
remain compatible with 0.8.2.2 jars
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index c5f1ddcc2d4..937e1cc5955 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -348,18 +348,7 @@ public class ReplicaVerificationTool {
         }
     }
 
-    private static class MessageInfo {
-        final int replicaId;
-        final long offset;
-        final long nextOffset;
-        final long checksum;
-
-        MessageInfo(int replicaId, long offset, long nextOffset, long 
checksum) {
-            this.replicaId = replicaId;
-            this.offset = offset;
-            this.nextOffset = nextOffset;
-            this.checksum = checksum;
-        }
+    private record MessageInfo(int replicaId, long offset, long nextOffset, 
long checksum) {
     }
 
     protected static class ReplicaBuffer {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 9c5323104fe..5b59fbee4a1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static java.util.Arrays.asList;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 
 public abstract class TransactionsCommand {
@@ -288,7 +287,7 @@ public abstract class TransactionsCommand {
     }
 
     static class DescribeProducersCommand extends TransactionsCommand {
-        static final List<String> HEADERS = asList(
+        static final List<String> HEADERS = List.of(
             "ProducerId",
             "ProducerEpoch",
             "LatestCoordinatorEpoch",
@@ -360,7 +359,7 @@ public abstract class TransactionsCommand {
                         
String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) :
                         "None";
 
-                return asList(
+                return List.of(
                     String.valueOf(producerState.producerId()),
                     String.valueOf(producerState.producerEpoch()),
                     
String.valueOf(producerState.coordinatorEpoch().orElse(-1)),
@@ -375,7 +374,7 @@ public abstract class TransactionsCommand {
     }
 
     static class DescribeTransactionsCommand extends TransactionsCommand {
-        static final List<String> HEADERS = asList(
+        static final List<String> HEADERS = List.of(
             "CoordinatorId",
             "TransactionalId",
             "ProducerId",
@@ -436,7 +435,7 @@ public abstract class TransactionsCommand {
                 transactionDurationMsColumnValue = "None";
             }
 
-            List<String> row = asList(
+            List<String> row = List.of(
                 String.valueOf(result.coordinatorId()),
                 transactionalId,
                 String.valueOf(result.producerId()),
@@ -453,7 +452,7 @@ public abstract class TransactionsCommand {
     }
 
     static class ListTransactionsCommand extends TransactionsCommand {
-        static final List<String> HEADERS = asList(
+        static final List<String> HEADERS = List.of(
             "TransactionalId",
             "Coordinator",
             "ProducerId",
@@ -510,7 +509,7 @@ public abstract class TransactionsCommand {
                 Collection<TransactionListing> listings = 
brokerListingsEntry.getValue();
 
                 for (TransactionListing listing : listings) {
-                    rows.add(asList(
+                    rows.add(List.of(
                         listing.transactionalId(),
                         coordinatorIdString,
                         String.valueOf(listing.producerId()),
@@ -526,7 +525,7 @@ public abstract class TransactionsCommand {
     static class FindHangingTransactionsCommand extends TransactionsCommand {
         private static final int MAX_BATCH_SIZE = 500;
 
-        static final List<String> HEADERS = asList(
+        static final List<String> HEADERS = List.of(
             "Topic",
             "Partition",
             "ProducerId",
@@ -709,7 +708,7 @@ public abstract class TransactionsCommand {
                 long transactionDurationMinutes = 
TimeUnit.MILLISECONDS.toMinutes(
                     currentTimeMs - transaction.producerState.lastTimestamp());
 
-                rows.add(asList(
+                rows.add(List.of(
                     transaction.topicPartition.topic(),
                     String.valueOf(transaction.topicPartition.partition()),
                     String.valueOf(transaction.producerState.producerId()),
@@ -848,17 +847,7 @@ public abstract class TransactionsCommand {
             return candidateTransactions;
         }
 
-        private static class OpenTransaction {
-            private final TopicPartition topicPartition;
-            private final ProducerState producerState;
-
-            private OpenTransaction(
-                TopicPartition topicPartition,
-                ProducerState producerState
-            ) {
-                this.topicPartition = topicPartition;
-                this.producerState = producerState;
-            }
+        private record OpenTransaction(TopicPartition topicPartition, 
ProducerState producerState) {
         }
 
         private void collectCandidateOpenTransactions(
@@ -1024,7 +1013,7 @@ public abstract class TransactionsCommand {
         PrintStream out,
         Time time
     ) throws Exception {
-        List<TransactionsCommand> commands = asList(
+        List<TransactionsCommand> commands = List.of(
             new ListTransactionsCommand(time),
             new DescribeTransactionsCommand(time),
             new DescribeProducersCommand(time),
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 1f29cdd8156..cfdef8f7518 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -347,20 +347,20 @@ public class ConsumerGroupCommand {
                         for (PartitionAssignmentState consumerAssignment : 
consumerAssignments) {
                             if (verbose) {
                                 System.out.printf(format,
-                                    consumerAssignment.group,
-                                    
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE)
+                                    consumerAssignment.group(),
+                                    
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.leaderEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE)
                                 );
                             } else {
                                 System.out.printf(format,
-                                    consumerAssignment.group,
-                                    
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE),
-                                    
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE)
+                                    consumerAssignment.group(),
+                                    
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE),
+                                    
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE), 
consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE)
                                 );
                             }
                         }
@@ -379,10 +379,10 @@ public class ConsumerGroupCommand {
             if (assignments.isPresent()) {
                 Collection<PartitionAssignmentState> consumerAssignments = 
assignments.get();
                 for (PartitionAssignmentState consumerAssignment : 
consumerAssignments) {
-                    maxGroupLen = Math.max(maxGroupLen, 
consumerAssignment.group.length());
-                    maxTopicLen = Math.max(maxTopicLen, 
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE).length());
-                    maxConsumerIdLen = Math.max(maxConsumerIdLen, 
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE).length());
-                    maxHostLen = Math.max(maxHostLen, 
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE).length());
+                    maxGroupLen = Math.max(maxGroupLen, 
consumerAssignment.group().length());
+                    maxTopicLen = Math.max(maxTopicLen, 
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE).length());
+                    maxConsumerIdLen = Math.max(maxConsumerIdLen, 
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE).length());
+                    maxHostLen = Math.max(maxHostLen, 
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE).length());
 
                 }
             }
@@ -408,20 +408,20 @@ public class ConsumerGroupCommand {
                     // find proper columns width
                     if (assignments.isPresent()) {
                         for (MemberAssignmentState memberAssignment : 
assignments.get()) {
-                            maxGroupLen = Math.max(maxGroupLen, 
memberAssignment.group.length());
-                            maxConsumerIdLen = Math.max(maxConsumerIdLen, 
memberAssignment.consumerId.length());
-                            maxGroupInstanceIdLen =  
Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId.length());
-                            maxHostLen = Math.max(maxHostLen, 
memberAssignment.host.length());
-                            maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId.length());
-                            includeGroupInstanceId = includeGroupInstanceId || 
!memberAssignment.groupInstanceId.isEmpty();
-                            String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
-                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
-                            String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
-                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                            maxGroupLen = Math.max(maxGroupLen, 
memberAssignment.group().length());
+                            maxConsumerIdLen = Math.max(maxConsumerIdLen, 
memberAssignment.consumerId().length());
+                            maxGroupInstanceIdLen =  
Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId().length());
+                            maxHostLen = Math.max(maxHostLen, 
memberAssignment.host().length());
+                            maxClientIdLen = Math.max(maxClientIdLen, 
memberAssignment.clientId().length());
+                            includeGroupInstanceId = includeGroupInstanceId || 
!memberAssignment.groupInstanceId().isEmpty();
+                            String currentAssignment = 
memberAssignment.assignment().isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment());
+                            String targetAssignment = 
memberAssignment.targetAssignment().isEmpty() ?
+                                MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment());
                             maxCurrentAssignment = 
Math.max(maxCurrentAssignment, currentAssignment.length());
                             maxTargetAssignment = 
Math.max(maxTargetAssignment, targetAssignment.length());
-                            hasClassicMember = hasClassicMember || 
(memberAssignment.upgraded.isPresent() && !memberAssignment.upgraded.get());
-                            hasConsumerMember = hasConsumerMember || 
(memberAssignment.upgraded.isPresent() && memberAssignment.upgraded.get());
+                            hasClassicMember = hasClassicMember || 
(memberAssignment.upgraded().isPresent() && !memberAssignment.upgraded().get());
+                            hasConsumerMember = hasConsumerMember || 
(memberAssignment.upgraded().isPresent() && memberAssignment.upgraded().get());
                         }
                     }
                 }
@@ -465,23 +465,23 @@ public class ConsumerGroupCommand {
         ) {
             for (MemberAssignmentState memberAssignment : memberAssignments) {
                 if (includeGroupInstanceId) {
-                    System.out.printf(formatWithGroupInstanceId, 
memberAssignment.group, memberAssignment.consumerId,
-                        memberAssignment.groupInstanceId, 
memberAssignment.host, memberAssignment.clientId,
-                        memberAssignment.numPartitions);
+                    System.out.printf(formatWithGroupInstanceId, 
memberAssignment.group(), memberAssignment.consumerId(),
+                        memberAssignment.groupInstanceId(), 
memberAssignment.host(), memberAssignment.clientId(),
+                        memberAssignment.numPartitions());
                 } else {
-                    System.out.printf(formatWithoutGroupInstanceId, 
memberAssignment.group, memberAssignment.consumerId,
-                        memberAssignment.host, memberAssignment.clientId, 
memberAssignment.numPartitions);
+                    System.out.printf(formatWithoutGroupInstanceId, 
memberAssignment.group(), memberAssignment.consumerId(),
+                        memberAssignment.host(), memberAssignment.clientId(), 
memberAssignment.numPartitions());
                 }
                 if (verbose) {
-                    String currentEpoch = 
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
-                    String currentAssignment = 
memberAssignment.assignment.isEmpty() ?
-                        MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment);
-                    String targetEpoch = 
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
-                    String targetAssignment = 
memberAssignment.targetAssignment.isEmpty() ?
-                        MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment);
+                    String currentEpoch = 
memberAssignment.currentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                    String currentAssignment = 
memberAssignment.assignment().isEmpty() ?
+                        MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.assignment());
+                    String targetEpoch = 
memberAssignment.targetEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+                    String targetAssignment = 
memberAssignment.targetAssignment().isEmpty() ?
+                        MISSING_COLUMN_VALUE : 
getAssignmentString(memberAssignment.targetAssignment());
                     if (hasMigrationMember) {
                         System.out.printf(formatWithUpgrade, currentEpoch, 
currentAssignment, targetEpoch, targetAssignment,
-                            
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE));
+                            
memberAssignment.upgraded().map(Object::toString).orElse(MISSING_COLUMN_VALUE));
                     } else {
                         System.out.printf(formatWithoutUpgrade, currentEpoch, 
currentAssignment, targetEpoch, targetAssignment);
                     }
@@ -511,23 +511,23 @@ public class ConsumerGroupCommand {
 
         private void printStates(Map<String, GroupInformation> states, boolean 
verbose) {
             states.forEach((groupId, state) -> {
-                if (shouldPrintMemberState(groupId, 
Optional.of(state.groupState), Optional.of(1))) {
-                    String coordinator = state.coordinator.host() + ":" + 
state.coordinator.port() + "  (" + state.coordinator.idString() + ")";
+                if (shouldPrintMemberState(groupId, 
Optional.of(state.groupState()), Optional.of(1))) {
+                    String coordinator = state.coordinator().host() + ":" + 
state.coordinator().port() + "  (" + state.coordinator().idString() + ")";
                     int coordinatorColLen = Math.max(25, coordinator.length());
-                    int groupColLen = Math.max(15, state.group.length());
+                    int groupColLen = Math.max(15, state.group().length());
 
-                    String assignmentStrategy = 
state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE : 
state.assignmentStrategy;
+                    String assignmentStrategy = 
state.assignmentStrategy().isEmpty() ? MISSING_COLUMN_VALUE : 
state.assignmentStrategy();
 
                     if (verbose) {
                         String format = "\n%" + -groupColLen + "s %" + 
-coordinatorColLen + "s %-20s %-20s %-15s %-25s %s";
                         System.out.printf(format, "GROUP", "COORDINATOR (ID)", 
"ASSIGNMENT-STRATEGY", "STATE",
                             "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", 
"#MEMBERS");
-                        System.out.printf(format, state.group, coordinator, 
assignmentStrategy, state.groupState,
-                            
state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
state.numMembers);
+                        System.out.printf(format, state.group(), coordinator, 
assignmentStrategy, state.groupState(),
+                            
state.groupEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE), 
state.targetAssignmentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
 state.numMembers());
                     } else {
                         String format = "\n%" + -groupColLen + "s %" + 
-coordinatorColLen + "s %-20s %-20s %s";
                         System.out.printf(format, "GROUP", "COORDINATOR (ID)", 
"ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
-                        System.out.printf(format, state.group, coordinator, 
assignmentStrategy, state.groupState, state.numMembers);
+                        System.out.printf(format, state.group(), coordinator, 
assignmentStrategy, state.groupState(), state.numMembers());
                     }
                     System.out.println();
                 }
@@ -623,8 +623,8 @@ public class ConsumerGroupCommand {
             // concat the data and then sort them
             return Stream.concat(existLeaderAssignments.stream(), 
noneLeaderAssignments.stream())
                     .sorted(Comparator.<PartitionAssignmentState, 
String>comparing(
-                            state -> state.topic.orElse(""), String::compareTo)
-                            .thenComparingInt(state -> 
state.partition.orElse(-1)))
+                            state -> state.topic().orElse(""), 
String::compareTo)
+                            .thenComparingInt(state -> 
state.partition().orElse(-1)))
                     .collect(Collectors.toList());
         }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
index 9fde352cdda..d313390b6a9 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
@@ -21,30 +21,6 @@ import org.apache.kafka.common.Node;
 
 import java.util.Optional;
 
-class GroupInformation {
-    final String group;
-    final Node coordinator;
-    final String assignmentStrategy;
-    final GroupState groupState;
-    final int numMembers;
-    final Optional<Integer> groupEpoch;
-    final Optional<Integer> targetAssignmentEpoch;
-
-    GroupInformation(
-        String group,
-        Node coordinator,
-        String assignmentStrategy,
-        GroupState groupState,
-        int numMembers,
-        Optional<Integer> groupEpoch,
-        Optional<Integer> targetAssignmentEpoch
-    ) {
-        this.group = group;
-        this.coordinator = coordinator;
-        this.assignmentStrategy = assignmentStrategy;
-        this.groupState = groupState;
-        this.numMembers = numMembers;
-        this.groupEpoch = groupEpoch;
-        this.targetAssignmentEpoch = targetAssignmentEpoch;
-    }
+record GroupInformation(String group, Node coordinator, String 
assignmentStrategy, GroupState groupState,
+                        int numMembers, Optional<Integer> groupEpoch, 
Optional<Integer> targetAssignmentEpoch) {
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
index 420e640419e..56feb8b029f 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
@@ -21,42 +21,8 @@ import org.apache.kafka.common.TopicPartition;
 import java.util.List;
 import java.util.Optional;
 
-class MemberAssignmentState {
-    final String group;
-    final String consumerId;
-    final String host;
-    final String clientId;
-    final String groupInstanceId;
-    final int numPartitions;
-    final List<TopicPartition> assignment;
-    final List<TopicPartition> targetAssignment;
-    final Optional<Integer> currentEpoch;
-    final Optional<Integer> targetEpoch;
-    final Optional<Boolean> upgraded;
-
-    MemberAssignmentState(
-        String group,
-        String consumerId,
-        String host,
-        String clientId,
-        String groupInstanceId,
-        int numPartitions,
-        List<TopicPartition> assignment,
-        List<TopicPartition> targetAssignment,
-        Optional<Integer> currentEpoch,
-        Optional<Integer> targetEpoch,
-        Optional<Boolean> upgraded
-    ) {
-        this.group = group;
-        this.consumerId = consumerId;
-        this.host = host;
-        this.clientId = clientId;
-        this.groupInstanceId = groupInstanceId;
-        this.numPartitions = numPartitions;
-        this.assignment = assignment;
-        this.targetAssignment = targetAssignment;
-        this.currentEpoch = currentEpoch;
-        this.targetEpoch = targetEpoch;
-        this.upgraded = upgraded;
-    }
+record MemberAssignmentState(String group, String consumerId, String host, 
String clientId, String groupInstanceId,
+                             int numPartitions, List<TopicPartition> 
assignment, List<TopicPartition> targetAssignment,
+                             Optional<Integer> currentEpoch, Optional<Integer> 
targetEpoch,
+                             Optional<Boolean> upgraded) {
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
index 53f5c391100..d42377ca029 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
@@ -20,42 +20,8 @@ import org.apache.kafka.common.Node;
 
 import java.util.Optional;
 
-class PartitionAssignmentState {
-    final String group;
-    final Optional<Node> coordinator;
-    final Optional<String> topic;
-    final Optional<Integer> partition;
-    final Optional<Long> offset;
-    final Optional<Long> lag;
-    final Optional<String> consumerId;
-    final Optional<String> host;
-    final Optional<String> clientId;
-    final Optional<Long> logEndOffset;
-    final Optional<Integer> leaderEpoch;
-
-    PartitionAssignmentState(
-        String group,
-        Optional<Node> coordinator,
-        Optional<String> topic,
-        Optional<Integer> partition,
-        Optional<Long> offset,
-        Optional<Long> lag,
-        Optional<String> consumerId,
-        Optional<String> host,
-        Optional<String> clientId,
-        Optional<Long> logEndOffset,
-        Optional<Integer> leaderEpoch
-    ) {
-        this.group = group;
-        this.coordinator = coordinator;
-        this.topic = topic;
-        this.partition = partition;
-        this.offset = offset;
-        this.lag = lag;
-        this.consumerId = consumerId;
-        this.host = host;
-        this.clientId = clientId;
-        this.logEndOffset = logEndOffset;
-        this.leaderEpoch = leaderEpoch;
-    }
+record PartitionAssignmentState(String group, Optional<Node> coordinator, 
Optional<String> topic,
+                                Optional<Integer> partition, Optional<Long> 
offset, Optional<Long> lag,
+                                Optional<String> consumerId, Optional<String> 
host, Optional<String> clientId,
+                                Optional<Long> logEndOffset, Optional<Integer> 
leaderEpoch) {
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 95723665032..df4353c467a 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -656,25 +656,7 @@ public class ShareGroupCommand {
         }
     }
 
-    static class SharePartitionOffsetInformation {
-        final String group;
-        final String topic;
-        final int partition;
-        final Optional<Long> offset;
-        final Optional<Integer> leaderEpoch;
-
-        SharePartitionOffsetInformation(
-            String group,
-            String topic,
-            int partition,
-            Optional<Long> offset,
-            Optional<Integer> leaderEpoch
-        ) {
-            this.group = group;
-            this.topic = topic;
-            this.partition = partition;
-            this.offset = offset;
-            this.leaderEpoch = leaderEpoch;
-        }
+    record SharePartitionOffsetInformation(String group, String topic, int 
partition, Optional<Long> offset,
+                                           Optional<Integer> leaderEpoch) {
     }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
index 842d46ec587..58e2cf9b80f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
@@ -17,44 +17,15 @@
 
 package org.apache.kafka.tools.reassign;
 
-import java.util.Objects;
-
 /**
  * A replica log directory move state where the move is in progress.
+ * @param currentLogDir The current log directory.
+ * @param futureLogDir  The log directory that the replica is moving to.
+ * @param targetLogDir  The log directory that we wanted the replica to move 
to.
  */
-final class ActiveMoveState implements LogDirMoveState {
-    public final String currentLogDir;
-
-    public final String targetLogDir;
-
-    public final String futureLogDir;
-
-    /**
-     * @param currentLogDir       The current log directory.
-     * @param futureLogDir        The log directory that the replica is moving 
to.
-     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
-     */
-    public ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) {
-        this.currentLogDir = currentLogDir;
-        this.targetLogDir = targetLogDir;
-        this.futureLogDir = futureLogDir;
-    }
-
+record ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) implements LogDirMoveState {
     @Override
     public boolean done() {
         return false;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ActiveMoveState that = (ActiveMoveState) o;
-        return Objects.equals(currentLogDir, that.currentLogDir) && 
Objects.equals(targetLogDir, that.targetLogDir) && Objects.equals(futureLogDir, 
that.futureLogDir);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(currentLogDir, targetLogDir, futureLogDir);
-    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
index f405eedd403..c7fa61af3d9 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
@@ -17,41 +17,15 @@
 
 package org.apache.kafka.tools.reassign;
 
-import java.util.Objects;
-
 /**
  * A replica log directory move state where there is no move in progress, but 
we did not
  * reach the target log directory.
+ * @param currentLogDir The current log directory.
+ * @param targetLogDir  The log directory that we wanted the replica to move 
to.
  */
-final class CancelledMoveState implements LogDirMoveState {
-    public final String currentLogDir;
-
-    public final String targetLogDir;
-
-    /**
-     * @param currentLogDir       The current log directory.
-     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
-     */
-    public CancelledMoveState(String currentLogDir, String targetLogDir) {
-        this.currentLogDir = currentLogDir;
-        this.targetLogDir = targetLogDir;
-    }
-
+record CancelledMoveState(String currentLogDir, String targetLogDir) 
implements LogDirMoveState {
     @Override
     public boolean done() {
         return true;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CancelledMoveState that = (CancelledMoveState) o;
-        return Objects.equals(currentLogDir, that.currentLogDir) && 
Objects.equals(targetLogDir, that.targetLogDir);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(currentLogDir, targetLogDir);
-    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
index df5fb890148..28b017b7cdc 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
@@ -17,36 +17,13 @@
 
 package org.apache.kafka.tools.reassign;
 
-import java.util.Objects;
-
 /**
  * The completed replica log directory move state.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
  */
-final class CompletedMoveState implements LogDirMoveState {
-    public final String targetLogDir;
-
-    /**
-     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
-     */
-    public CompletedMoveState(String targetLogDir) {
-        this.targetLogDir = targetLogDir;
-    }
-
+record CompletedMoveState(String targetLogDir) implements LogDirMoveState {
     @Override
     public boolean done() {
         return true;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CompletedMoveState that = (CompletedMoveState) o;
-        return Objects.equals(targetLogDir, that.targetLogDir);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(targetLogDir);
-    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
index eb3f592841c..44982a8b712 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
@@ -17,36 +17,13 @@
 
 package org.apache.kafka.tools.reassign;
 
-import java.util.Objects;
-
 /**
  * A replica log directory move state where the source replica is missing.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
  */
-final class MissingLogDirMoveState implements LogDirMoveState {
-    public final String targetLogDir;
-
-    /**
-     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
-     */
-    public MissingLogDirMoveState(String targetLogDir) {
-        this.targetLogDir = targetLogDir;
-    }
-
+record MissingLogDirMoveState(String targetLogDir) implements LogDirMoveState {
     @Override
     public boolean done() {
         return false;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        MissingLogDirMoveState that = (MissingLogDirMoveState) o;
-        return Objects.equals(targetLogDir, that.targetLogDir);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(targetLogDir);
-    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
index eda9c22b829..b802275ceeb 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
@@ -17,36 +17,13 @@
 
 package org.apache.kafka.tools.reassign;
 
-import java.util.Objects;
-
 /**
  * A replica log directory move state where the source log directory is 
missing.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
  */
-final class MissingReplicaMoveState implements LogDirMoveState {
-    public final String targetLogDir;
-
-    /**
-     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
-     */
-    public MissingReplicaMoveState(String targetLogDir) {
-        this.targetLogDir = targetLogDir;
-    }
-
+record MissingReplicaMoveState(String targetLogDir) implements LogDirMoveState 
{
     @Override
     public boolean done() {
         return false;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        MissingReplicaMoveState that = (MissingReplicaMoveState) o;
-        return Objects.equals(targetLogDir, that.targetLogDir);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(targetLogDir);
-    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 591c127ae2f..4628c34fe18 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -467,8 +467,8 @@ public class ReassignPartitionsCommand {
                 bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " cannot be found " +
                     "in any live log directory on broker " + 
replica.brokerId() + ".");
             } else if (state instanceof ActiveMoveState) {
-                String targetLogDir = ((ActiveMoveState) state).targetLogDir;
-                String futureLogDir = ((ActiveMoveState) state).futureLogDir;
+                String targetLogDir = ((ActiveMoveState) state).targetLogDir();
+                String futureLogDir = ((ActiveMoveState) state).futureLogDir();
                 if (targetLogDir.equals(futureLogDir)) {
                     bld.add("Reassignment of replica " + replica + " is still 
in progress.");
                 } else {
@@ -477,8 +477,8 @@ public class ReassignPartitionsCommand {
                         "instead of " + targetLogDir + ".");
                 }
             } else if (state instanceof CancelledMoveState) {
-                String targetLogDir = ((CancelledMoveState) 
state).targetLogDir;
-                String currentLogDir = ((CancelledMoveState) 
state).currentLogDir;
+                String targetLogDir = ((CancelledMoveState) 
state).targetLogDir();
+                String currentLogDir = ((CancelledMoveState) 
state).currentLogDir();
                 bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " on broker " +
                     replica.brokerId() + " is not being moved from log dir " + 
currentLogDir + " to " +
                     targetLogDir + ".");
@@ -1292,7 +1292,7 @@ public class ReassignPartitionsCommand {
         Map<TopicPartitionReplica, String> curMovingParts = new HashMap<>();
         findLogDirMoveStates(adminClient, targetReplicas).forEach((part, 
moveState) -> {
             if (moveState instanceof ActiveMoveState)
-                curMovingParts.put(part, ((ActiveMoveState) 
moveState).currentLogDir);
+                curMovingParts.put(part, ((ActiveMoveState) 
moveState).currentLogDir());
         });
         if (curMovingParts.isEmpty()) {
             System.out.print("None of the specified partition moves are 
active.");
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
index 2b8666d57bc..d10cf7b2e45 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
@@ -444,13 +444,7 @@ public class ConnectPluginPathTest {
         MULTI_JAR
     }
 
-    private static class PluginLocation {
-        private final Path path;
-
-        private PluginLocation(Path path) {
-            this.path = path;
-        }
-
+    private record PluginLocation(Path path) {
         @Override
         public String toString() {
             return path.toString();
@@ -504,15 +498,7 @@ public class ConnectPluginPathTest {
         }
     }
 
-    private static class PluginPathElement {
-        private final Path root;
-        private final List<PluginLocation> locations;
-
-        private PluginPathElement(Path root, List<PluginLocation> locations) {
-            this.root = root;
-            this.locations = locations;
-        }
-
+    private record PluginPathElement(Path root, List<PluginLocation> 
locations) {
         @Override
         public String toString() {
             return root.toString();
@@ -535,14 +521,7 @@ public class ConnectPluginPathTest {
         return new PluginPathElement(path, locations);
     }
 
-    private static class WorkerConfig {
-        private final Path configFile;
-        private final List<PluginPathElement> pluginPathElements;
-
-        private WorkerConfig(Path configFile, List<PluginPathElement> 
pluginPathElements) {
-            this.configFile = configFile;
-            this.pluginPathElements = pluginPathElements;
-        }
+    private record WorkerConfig(Path configFile, List<PluginPathElement> 
pluginPathElements) {
 
         @Override
         public String toString() {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index d5c1b035a44..236d1ce51cc 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -183,13 +183,13 @@ public class ConsumerGroupServiceTest {
 
         Map<TopicPartition, Optional<Long>> returnedOffsets = 
assignments.map(results ->
             results.stream().collect(Collectors.toMap(
-                assignment -> new TopicPartition(assignment.topic.get(), 
assignment.partition.get()),
-                assignment -> assignment.offset))
+                assignment -> new TopicPartition(assignment.topic().get(), 
assignment.partition().get()),
+                assignment -> assignment.offset()))
         ).orElse(Map.of());
         Map<TopicPartition, Optional<Integer>> returnedLeaderEpoch = 
assignments.map(results ->
             results.stream().collect(Collectors.toMap(
-                assignment -> new TopicPartition(assignment.topic.get(), 
assignment.partition.get()),
-                assignment -> assignment.leaderEpoch))
+                assignment -> new TopicPartition(assignment.topic().get(), 
assignment.partition().get()),
+                assignment -> assignment.leaderEpoch()))
         ).orElse(Map.of());
 
         Map<TopicPartition, Optional<Long>> expectedOffsets = Map.of(
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index d30c8081440..2866027e232 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -322,7 +322,7 @@ public class DeleteConsumerGroupsTest {
     }
 
     private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService 
service, String groupId, GroupState state) throws Exception {
-        return Objects.equals(service.collectGroupState(groupId).groupState, 
state);
+        return Objects.equals(service.collectGroupState(groupId).groupState(), 
state);
     }
 
     private ConsumerGroupCommand.ConsumerGroupService 
getConsumerGroupService(String[] args) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index b15b4fe9d45..9e5072576f4 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -462,7 +462,7 @@ public class DescribeConsumerGroupTest {
                     Optional<GroupState> state = groupOffsets.getKey();
                     Optional<Collection<PartitionAssignmentState>> assignments 
= groupOffsets.getValue();
 
-                    Predicate<PartitionAssignmentState> isGrp = s -> 
Objects.equals(s.group, group);
+                    Predicate<PartitionAssignmentState> isGrp = s -> 
Objects.equals(s.group(), group);
 
                     boolean res = state.map(s -> 
s.equals(GroupState.STABLE)).orElse(false) &&
                             assignments.isPresent() &&
@@ -477,9 +477,9 @@ public class DescribeConsumerGroupTest {
 
                     PartitionAssignmentState partitionState = 
maybePartitionState.get();
 
-                    return !partitionState.consumerId.map(s0 -> 
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
-                            !partitionState.clientId.map(s0 -> 
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
-                            !partitionState.host.map(h -> 
h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+                    return !partitionState.consumerId().map(s0 -> 
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+                            !partitionState.clientId().map(s0 -> 
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+                            !partitionState.host().map(h -> 
h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
                 }, "Expected a 'Stable' group status, rows and valid values 
for consumer id / client id / host columns in describe results for group " + 
group + ".");
             }
         }
@@ -506,7 +506,7 @@ public class DescribeConsumerGroupTest {
                 Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
 
                 assertTrue(res.getValue().isPresent());
-                assertTrue(res.getValue().get().size() == 1 && 
res.getValue().get().iterator().next().assignment.size() == 1,
+                assertTrue(res.getValue().get().size() == 1 && 
res.getValue().get().iterator().next().assignment().size() == 1,
                         "Expected a topic partition assigned to the single 
group member for group " + group);
             }
         }
@@ -526,10 +526,10 @@ public class DescribeConsumerGroupTest {
             ) {
                 TestUtils.waitForCondition(() -> {
                     GroupInformation state = service.collectGroupState(group);
-                    return Objects.equals(state.groupState, GroupState.STABLE) 
&&
-                            state.numMembers == 1 &&
-                            state.coordinator != null &&
-                            
clusterInstance.brokerIds().contains(state.coordinator.id());
+                    return Objects.equals(state.groupState(), 
GroupState.STABLE) &&
+                            state.numMembers() == 1 &&
+                            state.coordinator() != null &&
+                            
clusterInstance.brokerIds().contains(state.coordinator().id());
                 }, "Expected a 'Stable' group status, with one member for 
group " + group + ".");
             }
         }
@@ -558,11 +558,11 @@ public class DescribeConsumerGroupTest {
                 try (ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(new String[]{"--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group})) {
                     TestUtils.waitForCondition(() -> {
                         GroupInformation state = 
service.collectGroupState(group);
-                        return Objects.equals(state.groupState, 
GroupState.STABLE) &&
-                                state.numMembers == 1 &&
-                                Objects.equals(state.assignmentStrategy, 
expectedName) &&
-                                state.coordinator != null &&
-                                
clusterInstance.brokerIds().contains(state.coordinator.id());
+                        return Objects.equals(state.groupState(), 
GroupState.STABLE) &&
+                                state.numMembers() == 1 &&
+                                Objects.equals(state.assignmentStrategy(), 
expectedName) &&
+                                state.coordinator() != null &&
+                                
clusterInstance.brokerIds().contains(state.coordinator().id());
                     }, "Expected a 'Stable' group status, with one member and 
" + expectedName + " assignment strategy for group " + group + ".");
                 }
             } finally {
@@ -619,7 +619,7 @@ public class DescribeConsumerGroupTest {
                 TestUtils.waitForCondition(() -> {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false)
-                            && res.getValue().map(c -> 
c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && 
assignment.offset.isPresent())).orElse(false);
+                            && res.getValue().map(c -> 
c.stream().anyMatch(assignment -> Objects.equals(assignment.group(), group) && 
assignment.offset().isPresent())).orElse(false);
                 }, "Expected the group to initially become stable, and to find 
group in assignments after initial offset commit.");
 
                 // stop the consumer so the group has no active member anymore
@@ -629,13 +629,13 @@ public class DescribeConsumerGroupTest {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> offsets = 
service.collectGroupOffsets(group);
                     Optional<GroupState> state = offsets.getKey();
                     Optional<Collection<PartitionAssignmentState>> assignments 
= offsets.getValue();
-                    List<PartitionAssignmentState> testGroupAssignments = 
assignments.get().stream().filter(a -> Objects.equals(a.group, group)).toList();
+                    List<PartitionAssignmentState> testGroupAssignments = 
assignments.get().stream().filter(a -> Objects.equals(a.group(), 
group)).toList();
                     PartitionAssignmentState assignment = 
testGroupAssignments.get(0);
                     return state.map(s -> 
s.equals(GroupState.EMPTY)).orElse(false) &&
                             testGroupAssignments.size() == 1 &&
-                            assignment.consumerId.map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // 
the member should be gone
-                            assignment.clientId.map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
-                            assignment.host.map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+                            assignment.consumerId().map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // 
the member should be gone
+                            assignment.clientId().map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+                            assignment.host().map(c -> 
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
                 }, "failed to collect group offsets");
             }
         }
@@ -656,7 +656,7 @@ public class DescribeConsumerGroupTest {
                 TestUtils.waitForCondition(() -> {
                     Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false)
-                            && res.getValue().map(c -> c.stream().anyMatch(m 
-> Objects.equals(m.group, group))).orElse(false);
+                            && res.getValue().map(c -> c.stream().anyMatch(m 
-> Objects.equals(m.group(), group))).orElse(false);
                 }, "Expected the group to initially become stable, and to find 
group in assignments after initial offset commit.");
 
                 // stop the consumer so the group has no active member anymore
@@ -684,10 +684,10 @@ public class DescribeConsumerGroupTest {
             ) {
                 TestUtils.waitForCondition(() -> {
                     GroupInformation state = service.collectGroupState(group);
-                    return Objects.equals(state.groupState, GroupState.STABLE) 
&&
-                            state.numMembers == 1 &&
-                            state.coordinator != null &&
-                            
clusterInstance.brokerIds().contains(state.coordinator.id());
+                    return Objects.equals(state.groupState(), 
GroupState.STABLE) &&
+                            state.numMembers() == 1 &&
+                            state.coordinator() != null &&
+                            
clusterInstance.brokerIds().contains(state.coordinator().id());
                 }, "Expected the group to initially become stable, and have a 
single member.");
 
                 // stop the consumer so the group has no active member anymore
@@ -695,7 +695,7 @@ public class DescribeConsumerGroupTest {
 
                 TestUtils.waitForCondition(() -> {
                     GroupInformation state = service.collectGroupState(group);
-                    return Objects.equals(state.groupState, GroupState.EMPTY) 
&& state.numMembers == 0;
+                    return Objects.equals(state.groupState(), 
GroupState.EMPTY) && state.numMembers() == 0;
                 }, "Expected the group to become empty after the only member 
leaving.");
             }
         }
@@ -744,8 +744,8 @@ public class DescribeConsumerGroupTest {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).isPresent() &&
                             res.getValue().isPresent() &&
-                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group, group)).count() == 1 &&
-                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1;
+                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group(), group)).count() == 1 &&
+                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 1;
                 }, "Expected rows for consumers with no assigned partitions in 
describe group results");
             }
         }
@@ -767,15 +767,15 @@ public class DescribeConsumerGroupTest {
                     Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) &&
                             res.getValue().isPresent() &&
-                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group, group)).count() == 2 &&
-                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 &&
-                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group, group) && x.numPartitions == 0).count() == 1 &&
-                            res.getValue().get().stream().anyMatch(s -> 
!s.assignment.isEmpty());
+                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group(), group)).count() == 2 &&
+                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 1 &&
+                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group(), group) && x.numPartitions() == 0).count() == 1 &&
+                            res.getValue().get().stream().anyMatch(s -> 
!s.assignment().isEmpty());
                 }, "Expected rows for consumers with no assigned partitions in 
describe group results");
 
                 Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
                 assertTrue(res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false)
-                                && res.getValue().map(c -> 
c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
+                                && res.getValue().map(c -> 
c.stream().anyMatch(s -> !s.assignment().isEmpty())).orElse(false),
                         "Expected additional columns in verbose version of 
describe members");
             }
         }
@@ -795,7 +795,7 @@ public class DescribeConsumerGroupTest {
             ) {
                 TestUtils.waitForCondition(() -> {
                     GroupInformation state = service.collectGroupState(group);
-                    return Objects.equals(state.groupState, GroupState.STABLE) 
&& state.numMembers == 2;
+                    return Objects.equals(state.groupState(), 
GroupState.STABLE) && state.numMembers() == 2;
                 }, "Expected two consumers in describe group results");
             }
         }
@@ -844,9 +844,9 @@ public class DescribeConsumerGroupTest {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) &&
                             res.getValue().isPresent() &&
-                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group, group)).count() == 2 &&
-                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 &&
-                            res.getValue().get().stream().noneMatch(x -> 
Objects.equals(x.group, group) && x.partition.isEmpty());
+                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group(), group)).count() == 2 &&
+                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 2 &&
+                            res.getValue().get().stream().noneMatch(x -> 
Objects.equals(x.group(), group) && x.partition().isEmpty());
                 }, "Expected two rows (one row per consumer) in describe group 
results.");
             }
         }
@@ -868,13 +868,13 @@ public class DescribeConsumerGroupTest {
                     Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) &&
                             res.getValue().isPresent() &&
-                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group, group)).count() == 2 &&
-                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 &&
-                            res.getValue().get().stream().noneMatch(x -> 
Objects.equals(x.group, group) && x.numPartitions == 0);
+                            res.getValue().get().stream().filter(s -> 
Objects.equals(s.group(), group)).count() == 2 &&
+                            res.getValue().get().stream().filter(x -> 
Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 2 &&
+                            res.getValue().get().stream().noneMatch(x -> 
Objects.equals(x.group(), group) && x.numPartitions() == 0);
                 }, "Expected two rows (one row per consumer) in describe group 
members results.");
 
                 Entry<Optional<GroupState>, 
Optional<Collection<MemberAssignmentState>>> res = 
service.collectGroupMembers(group);
-                assertTrue(res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> 
s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
+                assertTrue(res.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> 
s.stream().filter(x -> x.assignment().isEmpty()).count()).orElse(0L) == 0,
                         "Expected additional columns in verbose version of 
describe members");
             }
         }
@@ -894,7 +894,7 @@ public class DescribeConsumerGroupTest {
             ) {
                 TestUtils.waitForCondition(() -> {
                     GroupInformation state = service.collectGroupState(group);
-                    return Objects.equals(state.groupState, GroupState.STABLE) 
&& Objects.equals(state.group, group) && state.numMembers == 2;
+                    return Objects.equals(state.groupState(), 
GroupState.STABLE) && Objects.equals(state.group(), group) && 
state.numMembers() == 2;
                 }, "Expected a stable group with two members in describe group 
state result.");
             }
         }
@@ -915,7 +915,7 @@ public class DescribeConsumerGroupTest {
                 TestUtils.waitForCondition(() -> {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> res = 
service.collectGroupOffsets(group);
                     return res.getKey().map(s -> 
s.equals(GroupState.EMPTY)).orElse(false)
-                            && res.getValue().isPresent() && 
res.getValue().get().stream().filter(s -> Objects.equals(s.group, 
group)).count() == 2;
+                            && res.getValue().isPresent() && 
res.getValue().get().stream().filter(s -> Objects.equals(s.group(), 
group)).count() == 2;
                 }, "Expected a stable group with two members in describe group 
state result.");
             }
         }
@@ -1030,7 +1030,7 @@ public class DescribeConsumerGroupTest {
                 TestUtils.waitForCondition(() -> {
                     Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> groupOffsets = 
service.collectGroupOffsets(group);
 
-                    Predicate<PartitionAssignmentState> isGrp = s -> 
Objects.equals(s.group, group);
+                    Predicate<PartitionAssignmentState> isGrp = s -> 
Objects.equals(s.group(), group);
 
                     boolean res = groupOffsets.getKey().map(s -> 
s.equals(GroupState.STABLE)).orElse(false) &&
                             groupOffsets.getValue().isPresent() &&
@@ -1045,9 +1045,9 @@ public class DescribeConsumerGroupTest {
 
                     PartitionAssignmentState assignmentState = 
maybeAssignmentState.get();
 
-                    return assignmentState.consumerId.map(c -> 
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
-                            assignmentState.clientId.map(c -> 
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
-                            assignmentState.host.map(h -> 
!h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+                    return assignmentState.consumerId().map(c -> 
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+                            assignmentState.clientId().map(c -> 
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+                            assignmentState.host().map(h -> 
!h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
                 }, "Expected a 'Stable' group status, rows and valid values 
for consumer id / client id / host columns in describe results for 
non-offset-committing group " + group + ".");
             }
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 1c9ab9cf98b..bbcfb6e35c1 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -896,10 +896,10 @@ public class ResetConsumerGroupOffsetTest {
     private void 
awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service,
                                             String group) throws Exception {
         TestUtils.waitForCondition(() -> {
-            GroupState state = service.collectGroupState(group).groupState;
+            GroupState state = service.collectGroupState(group).groupState();
             return Objects.equals(state, GroupState.EMPTY) || 
Objects.equals(state, GroupState.DEAD);
         }, "Expected that consumer group is inactive. Actual state: " +
-                service.collectGroupState(group).groupState);
+            service.collectGroupState(group).groupState());
     }
 
     private void resetAndAssertOffsetsCommitted(ClusterInstance cluster,
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index f1726721bfd..a8d830b4bd4 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -646,16 +646,7 @@ public class ReassignPartitionsCommandTest {
         }));
     }
 
-    static class LogDirReassignment {
-        final String json;
-        final String currentDir;
-        final String targetDir;
-
-        public LogDirReassignment(String json, String currentDir, String 
targetDir) {
-            this.json = json;
-            this.currentDir = currentDir;
-            this.targetDir = targetDir;
-        }
+    record LogDirReassignment(String json, String currentDir, String 
targetDir) {
     }
 
     private LogDirReassignment buildLogDirReassignment(TopicPartition 
topicPartition,


Reply via email to