This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0da9cacffab MINOR: Cleanups in Tools Module (3/n) (#20332) 0da9cacffab is described below commit 0da9cacffaba276e1bc6c1301aed449aaff788b9 Author: Sanskar Jhajharia <sjhajha...@confluent.io> AuthorDate: Tue Aug 19 19:54:13 2025 +0530 MINOR: Cleanups in Tools Module (3/n) (#20332) This PR aims at cleaning up the tools module further by getting rid of some extra code which can be replaced by `record` Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../java/org/apache/kafka/tools/AclCommand.java | 168 ++++++++++----------- .../org/apache/kafka/tools/ConnectPluginPath.java | 35 +---- .../apache/kafka/tools/MetadataQuorumCommand.java | 3 +- .../apache/kafka/tools/OAuthCompatibilityTool.java | 9 +- .../java/org/apache/kafka/tools/OffsetsUtils.java | 11 +- .../kafka/tools/PushHttpMetricsReporter.java | 85 ++--------- .../kafka/tools/ReplicaVerificationTool.java | 13 +- .../apache/kafka/tools/TransactionsCommand.java | 31 ++-- .../tools/consumer/group/ConsumerGroupCommand.java | 96 ++++++------ .../tools/consumer/group/GroupInformation.java | 28 +--- .../consumer/group/MemberAssignmentState.java | 42 +----- .../consumer/group/PartitionAssignmentState.java | 42 +----- .../tools/consumer/group/ShareGroupCommand.java | 22 +-- .../kafka/tools/reassign/ActiveMoveState.java | 37 +---- .../kafka/tools/reassign/CancelledMoveState.java | 32 +--- .../kafka/tools/reassign/CompletedMoveState.java | 27 +--- .../tools/reassign/MissingLogDirMoveState.java | 27 +--- .../tools/reassign/MissingReplicaMoveState.java | 27 +--- .../tools/reassign/ReassignPartitionsCommand.java | 10 +- .../apache/kafka/tools/ConnectPluginPathTest.java | 27 +--- .../consumer/group/ConsumerGroupServiceTest.java | 8 +- .../consumer/group/DeleteConsumerGroupsTest.java | 2 +- .../consumer/group/DescribeConsumerGroupTest.java | 92 +++++------ .../group/ResetConsumerGroupOffsetTest.java | 4 +- .../reassign/ReassignPartitionsCommandTest.java | 11 +- 25 files changed, 242 insertions(+), 647 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java index 791397b2405..fd62e8f3b56 100644 --- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java @@ -73,14 +73,13 @@ public class AclCommand { public static void main(String[] args) { AclCommandOptions opts = new AclCommandOptions(args); - AdminClientService aclCommandService = new AdminClientService(opts); try (Admin admin = Admin.create(adminConfigs(opts))) { if (opts.options.has(opts.addOpt)) { - aclCommandService.addAcls(admin); + addAcls(admin, opts); } else if (opts.options.has(opts.removeOpt)) { - aclCommandService.removeAcls(admin); + removeAcls(admin, opts); } else if (opts.options.has(opts.listOpt)) { - aclCommandService.listAcls(admin); + listAcls(admin, opts); } } catch (Throwable e) { System.out.println("Error while executing ACL command: " + e.getMessage()); @@ -102,106 +101,97 @@ public class AclCommand { return props; } - private static class AdminClientService { - - private final AclCommandOptions opts; - - AdminClientService(AclCommandOptions opts) { - this.opts = opts; - } - - void addAcls(Admin admin) throws ExecutionException, InterruptedException { - Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceToAcls(opts); - for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : resourceToAcl.entrySet()) { - ResourcePattern resource = entry.getKey(); - Set<AccessControlEntry> acls = entry.getValue(); - System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL); - Collection<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList()); - admin.createAcls(aclBindings).all().get(); - } + private static void addAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException { + Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceToAcls(opts); + for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : resourceToAcl.entrySet()) { + ResourcePattern resource = entry.getKey(); + Set<AccessControlEntry> acls = entry.getValue(); + System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL); + Collection<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList()); + admin.createAcls(aclBindings).all().get(); } + } - void removeAcls(Admin admin) throws ExecutionException, InterruptedException { - Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = getResourceFilterToAcls(opts); - for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : filterToAcl.entrySet()) { - ResourcePatternFilter filter = entry.getKey(); - Set<AccessControlEntry> acls = entry.getValue(); - if (acls.isEmpty()) { - if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) { - removeAcls(admin, acls, filter); - } - } else { - String msg = "Are you sure you want to remove ACLs: " + NL + - " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL + - " from resource filter `" + filter + "`? (y/n)"; - if (confirmAction(opts, msg)) { - removeAcls(admin, acls, filter); - } + private static void removeAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException { + Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = getResourceFilterToAcls(opts); + for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : filterToAcl.entrySet()) { + ResourcePatternFilter filter = entry.getKey(); + Set<AccessControlEntry> acls = entry.getValue(); + if (acls.isEmpty()) { + if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) { + removeAcls(admin, acls, filter); + } + } else { + String msg = "Are you sure you want to remove ACLs: " + NL + + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL + + " from resource filter `" + filter + "`? (y/n)"; + if (confirmAction(opts, msg)) { + removeAcls(admin, acls, filter); } } } + } - private void listAcls(Admin admin) throws ExecutionException, InterruptedException { - Set<ResourcePatternFilter> filters = getResourceFilter(opts, false); - Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt); - Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = getAcls(admin, filters); + private static void listAcls(Admin admin, AclCommandOptions opts) throws ExecutionException, InterruptedException { + Set<ResourcePatternFilter> filters = getResourceFilter(opts, false); + Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt); + Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = getAcls(admin, filters); - if (listPrincipals.isEmpty()) { - printResourceAcls(resourceToAcls); - } else { - listPrincipals.forEach(principal -> { - System.out.println("ACLs for principal `" + principal + "`"); - Map<ResourcePattern, Set<AccessControlEntry>> filteredResourceToAcls = resourceToAcls.entrySet().stream() - .map(entry -> { - ResourcePattern resource = entry.getKey(); - Set<AccessControlEntry> acls = entry.getValue().stream() - .filter(acl -> principal.toString().equals(acl.principal())) - .collect(Collectors.toSet()); - return new AbstractMap.SimpleEntry<>(resource, acls); - }) - .filter(entry -> !entry.getValue().isEmpty()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - printResourceAcls(filteredResourceToAcls); - }); - } + if (listPrincipals.isEmpty()) { + printResourceAcls(resourceToAcls); + } else { + listPrincipals.forEach(principal -> { + System.out.println("ACLs for principal `" + principal + "`"); + Map<ResourcePattern, Set<AccessControlEntry>> filteredResourceToAcls = resourceToAcls.entrySet().stream() + .map(entry -> { + ResourcePattern resource = entry.getKey(); + Set<AccessControlEntry> acls = entry.getValue().stream() + .filter(acl -> principal.toString().equals(acl.principal())) + .collect(Collectors.toSet()); + return new AbstractMap.SimpleEntry<>(resource, acls); + }) + .filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + printResourceAcls(filteredResourceToAcls); + }); } + } - private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls) { - resourceToAcls.forEach((resource, acls) -> - System.out.println("Current ACLs for resource `" + resource + "`:" + NL + - acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL) - ); - } + private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls) { + resourceToAcls.forEach((resource, acls) -> + System.out.println("Current ACLs for resource `" + resource + "`:" + NL + + acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL) + ); + } - private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException { - if (acls.isEmpty()) { - adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get(); - } else { - List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList()); - adminClient.deleteAcls(aclBindingFilters).all().get(); - } + private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException { + if (acls.isEmpty()) { + adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get(); + } else { + List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList()); + adminClient.deleteAcls(aclBindingFilters).all().get(); } + } - private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, InterruptedException { - Collection<AclBinding> aclBindings; - if (filters.isEmpty()) { - aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get(); - } else { - aclBindings = new ArrayList<>(); - for (ResourcePatternFilter filter : filters) { - aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get()); - } + private static Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, InterruptedException { + Collection<AclBinding> aclBindings; + if (filters.isEmpty()) { + aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get(); + } else { + aclBindings = new ArrayList<>(); + for (ResourcePatternFilter filter : filters) { + aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get()); } + } - Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new HashMap<>(); - for (AclBinding aclBinding : aclBindings) { - ResourcePattern resource = aclBinding.pattern(); - Set<AccessControlEntry> acls = resourceToAcls.getOrDefault(resource, new HashSet<>()); - acls.add(aclBinding.entry()); - resourceToAcls.put(resource, acls); - } - return resourceToAcls; + Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new HashMap<>(); + for (AclBinding aclBinding : aclBindings) { + ResourcePattern resource = aclBinding.pattern(); + Set<AccessControlEntry> acls = resourceToAcls.getOrDefault(resource, new HashSet<>()); + acls.add(aclBinding.entry()); + resourceToAcls.put(resource, acls); } + return resourceToAcls; } private static Map<ResourcePattern, Set<AccessControlEntry>> getResourceToAcls(AclCommandOptions opts) { diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index c86638e846a..a3da1d3a9c6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -196,22 +196,8 @@ public class ConnectPluginPath { LIST, SYNC_MANIFESTS } - private static class Config { - private final Command command; - private final Set<Path> locations; - private final boolean dryRun; - private final boolean keepNotFound; - private final PrintStream out; - private final PrintStream err; - - private Config(Command command, Set<Path> locations, boolean dryRun, boolean keepNotFound, PrintStream out, PrintStream err) { - this.command = command; - this.locations = locations; - this.dryRun = dryRun; - this.keepNotFound = keepNotFound; - this.out = out; - this.err = err; - } + private record Config(Command command, Set<Path> locations, boolean dryRun, boolean keepNotFound, PrintStream out, + PrintStream err) { @Override public String toString() { @@ -262,16 +248,9 @@ public class ConnectPluginPath { * <p>This is unique to the (source, class, type) tuple, and contains additional pre-computed information * that pertains to this specific plugin. */ - private static class Row { - private final ManifestWorkspace.SourceWorkspace<?> workspace; - private final String className; - private final PluginType type; - private final String version; - private final List<String> aliases; - private final boolean loadable; - private final boolean hasManifest; - - public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) { + private record Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, + String version, List<String> aliases, boolean loadable, boolean hasManifest) { + private Row(ManifestWorkspace.SourceWorkspace<?> workspace, String className, PluginType type, String version, List<String> aliases, boolean loadable, boolean hasManifest) { this.workspace = Objects.requireNonNull(workspace, "workspace must be non-null"); this.className = Objects.requireNonNull(className, "className must be non-null"); this.version = Objects.requireNonNull(version, "version must be non-null"); @@ -281,10 +260,6 @@ public class ConnectPluginPath { this.hasManifest = hasManifest; } - private boolean loadable() { - return loadable; - } - private boolean compatible() { return loadable && hasManifest; } diff --git a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java index b65655a8c89..4abe443b82b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java @@ -65,7 +65,6 @@ import java.util.stream.Stream; import static java.lang.String.format; import static java.lang.String.valueOf; -import static java.util.Arrays.asList; /** * A tool for describing quorum status @@ -206,7 +205,7 @@ public class MetadataQuorumCommand { rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(), "Observer", humanReadable)); ToolsUtils.prettyPrintTable( - asList("NodeId", "DirectoryId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"), + List.of("NodeId", "DirectoryId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"), rows, System.out ); diff --git a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java index 40f3100054b..59dbb47daec 100644 --- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java @@ -292,14 +292,7 @@ public class OAuthCompatibilityTool { } - private static class ConfigHandler { - - private final Namespace namespace; - - - private ConfigHandler(Namespace namespace) { - this.namespace = namespace; - } + private record ConfigHandler(Namespace namespace) { private Map<String, ?> getConfigs() { Map<String, Object> m = new HashMap<>(); diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java index e37bf2804d7..269cd53875d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java @@ -500,16 +500,7 @@ public class OffsetsUtils { public interface LogOffsetResult { } - public static class LogOffset implements LogOffsetResult { - final long value; - - public LogOffset(long value) { - this.value = value; - } - - public long value() { - return value; - } + public record LogOffset(long value) implements LogOffsetResult { } public static class Unknown implements LogOffsetResult { } diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java index 86ea19f0623..9b423b78947 100644 --- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java +++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java @@ -240,86 +240,19 @@ public class PushHttpMetricsReporter implements MetricsReporter { } } - private static class MetricsReport { - private final MetricClientInfo client; - private final Collection<MetricValue> metrics; - - MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) { - this.client = client; - this.metrics = metrics; - } - - @JsonProperty - public MetricClientInfo client() { - return client; - } - - @JsonProperty - public Collection<MetricValue> metrics() { - return metrics; - } + private record MetricsReport(@JsonProperty("client") MetricClientInfo client, + @JsonProperty("metrics") Collection<MetricValue> metrics) { } - private static class MetricClientInfo { - private final String host; - private final String clientId; - private final long time; - - MetricClientInfo(String host, String clientId, long time) { - this.host = host; - this.clientId = clientId; - this.time = time; - } - - @JsonProperty - public String host() { - return host; - } - - @JsonProperty("client_id") - public String clientId() { - return clientId; - } - - @JsonProperty - public long time() { - return time; - } + private record MetricClientInfo(@JsonProperty("host") String host, + @JsonProperty("client_id") String clientId, + @JsonProperty("time") long time) { } - private static class MetricValue { - - private final String name; - private final String group; - private final Map<String, String> tags; - private final Object value; - - MetricValue(String name, String group, Map<String, String> tags, Object value) { - this.name = name; - this.group = group; - this.tags = tags; - this.value = value; - } - - @JsonProperty - public String name() { - return name; - } - - @JsonProperty - public String group() { - return group; - } - - @JsonProperty - public Map<String, String> tags() { - return tags; - } - - @JsonProperty - public Object value() { - return value; - } + private record MetricValue(@JsonProperty("name") String name, + @JsonProperty("group") String group, + @JsonProperty("tags") Map<String, String> tags, + @JsonProperty("value") Object value) { } // The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index c5f1ddcc2d4..937e1cc5955 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -348,18 +348,7 @@ public class ReplicaVerificationTool { } } - private static class MessageInfo { - final int replicaId; - final long offset; - final long nextOffset; - final long checksum; - - MessageInfo(int replicaId, long offset, long nextOffset, long checksum) { - this.replicaId = replicaId; - this.offset = offset; - this.nextOffset = nextOffset; - this.checksum = checksum; - } + private record MessageInfo(int replicaId, long offset, long nextOffset, long checksum) { } protected static class ReplicaBuffer { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 9c5323104fe..5b59fbee4a1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; -import static java.util.Arrays.asList; import static net.sourceforge.argparse4j.impl.Arguments.store; public abstract class TransactionsCommand { @@ -288,7 +287,7 @@ public abstract class TransactionsCommand { } static class DescribeProducersCommand extends TransactionsCommand { - static final List<String> HEADERS = asList( + static final List<String> HEADERS = List.of( "ProducerId", "ProducerEpoch", "LatestCoordinatorEpoch", @@ -360,7 +359,7 @@ public abstract class TransactionsCommand { String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : "None"; - return asList( + return List.of( String.valueOf(producerState.producerId()), String.valueOf(producerState.producerEpoch()), String.valueOf(producerState.coordinatorEpoch().orElse(-1)), @@ -375,7 +374,7 @@ public abstract class TransactionsCommand { } static class DescribeTransactionsCommand extends TransactionsCommand { - static final List<String> HEADERS = asList( + static final List<String> HEADERS = List.of( "CoordinatorId", "TransactionalId", "ProducerId", @@ -436,7 +435,7 @@ public abstract class TransactionsCommand { transactionDurationMsColumnValue = "None"; } - List<String> row = asList( + List<String> row = List.of( String.valueOf(result.coordinatorId()), transactionalId, String.valueOf(result.producerId()), @@ -453,7 +452,7 @@ public abstract class TransactionsCommand { } static class ListTransactionsCommand extends TransactionsCommand { - static final List<String> HEADERS = asList( + static final List<String> HEADERS = List.of( "TransactionalId", "Coordinator", "ProducerId", @@ -510,7 +509,7 @@ public abstract class TransactionsCommand { Collection<TransactionListing> listings = brokerListingsEntry.getValue(); for (TransactionListing listing : listings) { - rows.add(asList( + rows.add(List.of( listing.transactionalId(), coordinatorIdString, String.valueOf(listing.producerId()), @@ -526,7 +525,7 @@ public abstract class TransactionsCommand { static class FindHangingTransactionsCommand extends TransactionsCommand { private static final int MAX_BATCH_SIZE = 500; - static final List<String> HEADERS = asList( + static final List<String> HEADERS = List.of( "Topic", "Partition", "ProducerId", @@ -709,7 +708,7 @@ public abstract class TransactionsCommand { long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes( currentTimeMs - transaction.producerState.lastTimestamp()); - rows.add(asList( + rows.add(List.of( transaction.topicPartition.topic(), String.valueOf(transaction.topicPartition.partition()), String.valueOf(transaction.producerState.producerId()), @@ -848,17 +847,7 @@ public abstract class TransactionsCommand { return candidateTransactions; } - private static class OpenTransaction { - private final TopicPartition topicPartition; - private final ProducerState producerState; - - private OpenTransaction( - TopicPartition topicPartition, - ProducerState producerState - ) { - this.topicPartition = topicPartition; - this.producerState = producerState; - } + private record OpenTransaction(TopicPartition topicPartition, ProducerState producerState) { } private void collectCandidateOpenTransactions( @@ -1024,7 +1013,7 @@ public abstract class TransactionsCommand { PrintStream out, Time time ) throws Exception { - List<TransactionsCommand> commands = asList( + List<TransactionsCommand> commands = List.of( new ListTransactionsCommand(time), new DescribeTransactionsCommand(time), new DescribeProducersCommand(time), diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index 1f29cdd8156..cfdef8f7518 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -347,20 +347,20 @@ public class ConsumerGroupCommand { for (PartitionAssignmentState consumerAssignment : consumerAssignments) { if (verbose) { System.out.printf(format, - consumerAssignment.group, - consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE), - consumerAssignment.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), - consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), - consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE), - consumerAssignment.host.orElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE) + consumerAssignment.group(), + consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE), consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE), + consumerAssignment.leaderEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE), + consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), + consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE), + consumerAssignment.host().orElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE) ); } else { System.out.printf(format, - consumerAssignment.group, - consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE), consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE), - consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE), - consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE), - consumerAssignment.host.orElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE) + consumerAssignment.group(), + consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE), consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE), + consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE), + consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE), consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE), + consumerAssignment.host().orElse(MISSING_COLUMN_VALUE), consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE) ); } } @@ -379,10 +379,10 @@ public class ConsumerGroupCommand { if (assignments.isPresent()) { Collection<PartitionAssignmentState> consumerAssignments = assignments.get(); for (PartitionAssignmentState consumerAssignment : consumerAssignments) { - maxGroupLen = Math.max(maxGroupLen, consumerAssignment.group.length()); - maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE).length()); - maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE).length()); - maxHostLen = Math.max(maxHostLen, consumerAssignment.host.orElse(MISSING_COLUMN_VALUE).length()); + maxGroupLen = Math.max(maxGroupLen, consumerAssignment.group().length()); + maxTopicLen = Math.max(maxTopicLen, consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE).length()); + maxConsumerIdLen = Math.max(maxConsumerIdLen, consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE).length()); + maxHostLen = Math.max(maxHostLen, consumerAssignment.host().orElse(MISSING_COLUMN_VALUE).length()); } } @@ -408,20 +408,20 @@ public class ConsumerGroupCommand { // find proper columns width if (assignments.isPresent()) { for (MemberAssignmentState memberAssignment : assignments.get()) { - maxGroupLen = Math.max(maxGroupLen, memberAssignment.group.length()); - maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId.length()); - maxGroupInstanceIdLen = Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId.length()); - maxHostLen = Math.max(maxHostLen, memberAssignment.host.length()); - maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId.length()); - includeGroupInstanceId = includeGroupInstanceId || !memberAssignment.groupInstanceId.isEmpty(); - String currentAssignment = memberAssignment.assignment.isEmpty() ? - MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); - String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? - MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + maxGroupLen = Math.max(maxGroupLen, memberAssignment.group().length()); + maxConsumerIdLen = Math.max(maxConsumerIdLen, memberAssignment.consumerId().length()); + maxGroupInstanceIdLen = Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId().length()); + maxHostLen = Math.max(maxHostLen, memberAssignment.host().length()); + maxClientIdLen = Math.max(maxClientIdLen, memberAssignment.clientId().length()); + includeGroupInstanceId = includeGroupInstanceId || !memberAssignment.groupInstanceId().isEmpty(); + String currentAssignment = memberAssignment.assignment().isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment()); + String targetAssignment = memberAssignment.targetAssignment().isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment()); maxCurrentAssignment = Math.max(maxCurrentAssignment, currentAssignment.length()); maxTargetAssignment = Math.max(maxTargetAssignment, targetAssignment.length()); - hasClassicMember = hasClassicMember || (memberAssignment.upgraded.isPresent() && !memberAssignment.upgraded.get()); - hasConsumerMember = hasConsumerMember || (memberAssignment.upgraded.isPresent() && memberAssignment.upgraded.get()); + hasClassicMember = hasClassicMember || (memberAssignment.upgraded().isPresent() && !memberAssignment.upgraded().get()); + hasConsumerMember = hasConsumerMember || (memberAssignment.upgraded().isPresent() && memberAssignment.upgraded().get()); } } } @@ -465,23 +465,23 @@ public class ConsumerGroupCommand { ) { for (MemberAssignmentState memberAssignment : memberAssignments) { if (includeGroupInstanceId) { - System.out.printf(formatWithGroupInstanceId, memberAssignment.group, memberAssignment.consumerId, - memberAssignment.groupInstanceId, memberAssignment.host, memberAssignment.clientId, - memberAssignment.numPartitions); + System.out.printf(formatWithGroupInstanceId, memberAssignment.group(), memberAssignment.consumerId(), + memberAssignment.groupInstanceId(), memberAssignment.host(), memberAssignment.clientId(), + memberAssignment.numPartitions()); } else { - System.out.printf(formatWithoutGroupInstanceId, memberAssignment.group, memberAssignment.consumerId, - memberAssignment.host, memberAssignment.clientId, memberAssignment.numPartitions); + System.out.printf(formatWithoutGroupInstanceId, memberAssignment.group(), memberAssignment.consumerId(), + memberAssignment.host(), memberAssignment.clientId(), memberAssignment.numPartitions()); } if (verbose) { - String currentEpoch = memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); - String currentAssignment = memberAssignment.assignment.isEmpty() ? - MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment); - String targetEpoch = memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE); - String targetAssignment = memberAssignment.targetAssignment.isEmpty() ? - MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment); + String currentEpoch = memberAssignment.currentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String currentAssignment = memberAssignment.assignment().isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.assignment()); + String targetEpoch = memberAssignment.targetEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE); + String targetAssignment = memberAssignment.targetAssignment().isEmpty() ? + MISSING_COLUMN_VALUE : getAssignmentString(memberAssignment.targetAssignment()); if (hasMigrationMember) { System.out.printf(formatWithUpgrade, currentEpoch, currentAssignment, targetEpoch, targetAssignment, - memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE)); + memberAssignment.upgraded().map(Object::toString).orElse(MISSING_COLUMN_VALUE)); } else { System.out.printf(formatWithoutUpgrade, currentEpoch, currentAssignment, targetEpoch, targetAssignment); } @@ -511,23 +511,23 @@ public class ConsumerGroupCommand { private void printStates(Map<String, GroupInformation> states, boolean verbose) { states.forEach((groupId, state) -> { - if (shouldPrintMemberState(groupId, Optional.of(state.groupState), Optional.of(1))) { - String coordinator = state.coordinator.host() + ":" + state.coordinator.port() + " (" + state.coordinator.idString() + ")"; + if (shouldPrintMemberState(groupId, Optional.of(state.groupState()), Optional.of(1))) { + String coordinator = state.coordinator().host() + ":" + state.coordinator().port() + " (" + state.coordinator().idString() + ")"; int coordinatorColLen = Math.max(25, coordinator.length()); - int groupColLen = Math.max(15, state.group.length()); + int groupColLen = Math.max(15, state.group().length()); - String assignmentStrategy = state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE : state.assignmentStrategy; + String assignmentStrategy = state.assignmentStrategy().isEmpty() ? MISSING_COLUMN_VALUE : state.assignmentStrategy(); if (verbose) { String format = "\n%" + -groupColLen + "s %" + -coordinatorColLen + "s %-20s %-20s %-15s %-25s %s"; System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS"); - System.out.printf(format, state.group, coordinator, assignmentStrategy, state.groupState, - state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE), state.numMembers); + System.out.printf(format, state.group(), coordinator, assignmentStrategy, state.groupState(), + state.groupEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE), state.targetAssignmentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE), state.numMembers()); } else { String format = "\n%" + -groupColLen + "s %" + -coordinatorColLen + "s %-20s %-20s %s"; System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"); - System.out.printf(format, state.group, coordinator, assignmentStrategy, state.groupState, state.numMembers); + System.out.printf(format, state.group(), coordinator, assignmentStrategy, state.groupState(), state.numMembers()); } System.out.println(); } @@ -623,8 +623,8 @@ public class ConsumerGroupCommand { // concat the data and then sort them return Stream.concat(existLeaderAssignments.stream(), noneLeaderAssignments.stream()) .sorted(Comparator.<PartitionAssignmentState, String>comparing( - state -> state.topic.orElse(""), String::compareTo) - .thenComparingInt(state -> state.partition.orElse(-1))) + state -> state.topic().orElse(""), String::compareTo) + .thenComparingInt(state -> state.partition().orElse(-1))) .collect(Collectors.toList()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java index 9fde352cdda..d313390b6a9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java @@ -21,30 +21,6 @@ import org.apache.kafka.common.Node; import java.util.Optional; -class GroupInformation { - final String group; - final Node coordinator; - final String assignmentStrategy; - final GroupState groupState; - final int numMembers; - final Optional<Integer> groupEpoch; - final Optional<Integer> targetAssignmentEpoch; - - GroupInformation( - String group, - Node coordinator, - String assignmentStrategy, - GroupState groupState, - int numMembers, - Optional<Integer> groupEpoch, - Optional<Integer> targetAssignmentEpoch - ) { - this.group = group; - this.coordinator = coordinator; - this.assignmentStrategy = assignmentStrategy; - this.groupState = groupState; - this.numMembers = numMembers; - this.groupEpoch = groupEpoch; - this.targetAssignmentEpoch = targetAssignmentEpoch; - } +record GroupInformation(String group, Node coordinator, String assignmentStrategy, GroupState groupState, + int numMembers, Optional<Integer> groupEpoch, Optional<Integer> targetAssignmentEpoch) { } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java index 420e640419e..56feb8b029f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java @@ -21,42 +21,8 @@ import org.apache.kafka.common.TopicPartition; import java.util.List; import java.util.Optional; -class MemberAssignmentState { - final String group; - final String consumerId; - final String host; - final String clientId; - final String groupInstanceId; - final int numPartitions; - final List<TopicPartition> assignment; - final List<TopicPartition> targetAssignment; - final Optional<Integer> currentEpoch; - final Optional<Integer> targetEpoch; - final Optional<Boolean> upgraded; - - MemberAssignmentState( - String group, - String consumerId, - String host, - String clientId, - String groupInstanceId, - int numPartitions, - List<TopicPartition> assignment, - List<TopicPartition> targetAssignment, - Optional<Integer> currentEpoch, - Optional<Integer> targetEpoch, - Optional<Boolean> upgraded - ) { - this.group = group; - this.consumerId = consumerId; - this.host = host; - this.clientId = clientId; - this.groupInstanceId = groupInstanceId; - this.numPartitions = numPartitions; - this.assignment = assignment; - this.targetAssignment = targetAssignment; - this.currentEpoch = currentEpoch; - this.targetEpoch = targetEpoch; - this.upgraded = upgraded; - } +record MemberAssignmentState(String group, String consumerId, String host, String clientId, String groupInstanceId, + int numPartitions, List<TopicPartition> assignment, List<TopicPartition> targetAssignment, + Optional<Integer> currentEpoch, Optional<Integer> targetEpoch, + Optional<Boolean> upgraded) { } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java index 53f5c391100..d42377ca029 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java @@ -20,42 +20,8 @@ import org.apache.kafka.common.Node; import java.util.Optional; -class PartitionAssignmentState { - final String group; - final Optional<Node> coordinator; - final Optional<String> topic; - final Optional<Integer> partition; - final Optional<Long> offset; - final Optional<Long> lag; - final Optional<String> consumerId; - final Optional<String> host; - final Optional<String> clientId; - final Optional<Long> logEndOffset; - final Optional<Integer> leaderEpoch; - - PartitionAssignmentState( - String group, - Optional<Node> coordinator, - Optional<String> topic, - Optional<Integer> partition, - Optional<Long> offset, - Optional<Long> lag, - Optional<String> consumerId, - Optional<String> host, - Optional<String> clientId, - Optional<Long> logEndOffset, - Optional<Integer> leaderEpoch - ) { - this.group = group; - this.coordinator = coordinator; - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.lag = lag; - this.consumerId = consumerId; - this.host = host; - this.clientId = clientId; - this.logEndOffset = logEndOffset; - this.leaderEpoch = leaderEpoch; - } +record PartitionAssignmentState(String group, Optional<Node> coordinator, Optional<String> topic, + Optional<Integer> partition, Optional<Long> offset, Optional<Long> lag, + Optional<String> consumerId, Optional<String> host, Optional<String> clientId, + Optional<Long> logEndOffset, Optional<Integer> leaderEpoch) { } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 95723665032..df4353c467a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -656,25 +656,7 @@ public class ShareGroupCommand { } } - static class SharePartitionOffsetInformation { - final String group; - final String topic; - final int partition; - final Optional<Long> offset; - final Optional<Integer> leaderEpoch; - - SharePartitionOffsetInformation( - String group, - String topic, - int partition, - Optional<Long> offset, - Optional<Integer> leaderEpoch - ) { - this.group = group; - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.leaderEpoch = leaderEpoch; - } + record SharePartitionOffsetInformation(String group, String topic, int partition, Optional<Long> offset, + Optional<Integer> leaderEpoch) { } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java index 842d46ec587..58e2cf9b80f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java @@ -17,44 +17,15 @@ package org.apache.kafka.tools.reassign; -import java.util.Objects; - /** * A replica log directory move state where the move is in progress. + * @param currentLogDir The current log directory. + * @param futureLogDir The log directory that the replica is moving to. + * @param targetLogDir The log directory that we wanted the replica to move to. */ -final class ActiveMoveState implements LogDirMoveState { - public final String currentLogDir; - - public final String targetLogDir; - - public final String futureLogDir; - - /** - * @param currentLogDir The current log directory. - * @param futureLogDir The log directory that the replica is moving to. - * @param targetLogDir The log directory that we wanted the replica to move to. - */ - public ActiveMoveState(String currentLogDir, String targetLogDir, String futureLogDir) { - this.currentLogDir = currentLogDir; - this.targetLogDir = targetLogDir; - this.futureLogDir = futureLogDir; - } - +record ActiveMoveState(String currentLogDir, String targetLogDir, String futureLogDir) implements LogDirMoveState { @Override public boolean done() { return false; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ActiveMoveState that = (ActiveMoveState) o; - return Objects.equals(currentLogDir, that.currentLogDir) && Objects.equals(targetLogDir, that.targetLogDir) && Objects.equals(futureLogDir, that.futureLogDir); - } - - @Override - public int hashCode() { - return Objects.hash(currentLogDir, targetLogDir, futureLogDir); - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java index f405eedd403..c7fa61af3d9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java @@ -17,41 +17,15 @@ package org.apache.kafka.tools.reassign; -import java.util.Objects; - /** * A replica log directory move state where there is no move in progress, but we did not * reach the target log directory. + * @param currentLogDir The current log directory. + * @param targetLogDir The log directory that we wanted the replica to move to. */ -final class CancelledMoveState implements LogDirMoveState { - public final String currentLogDir; - - public final String targetLogDir; - - /** - * @param currentLogDir The current log directory. - * @param targetLogDir The log directory that we wanted the replica to move to. - */ - public CancelledMoveState(String currentLogDir, String targetLogDir) { - this.currentLogDir = currentLogDir; - this.targetLogDir = targetLogDir; - } - +record CancelledMoveState(String currentLogDir, String targetLogDir) implements LogDirMoveState { @Override public boolean done() { return true; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CancelledMoveState that = (CancelledMoveState) o; - return Objects.equals(currentLogDir, that.currentLogDir) && Objects.equals(targetLogDir, that.targetLogDir); - } - - @Override - public int hashCode() { - return Objects.hash(currentLogDir, targetLogDir); - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java index df5fb890148..28b017b7cdc 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java @@ -17,36 +17,13 @@ package org.apache.kafka.tools.reassign; -import java.util.Objects; - /** * The completed replica log directory move state. + * @param targetLogDir The log directory that we wanted the replica to move to. */ -final class CompletedMoveState implements LogDirMoveState { - public final String targetLogDir; - - /** - * @param targetLogDir The log directory that we wanted the replica to move to. - */ - public CompletedMoveState(String targetLogDir) { - this.targetLogDir = targetLogDir; - } - +record CompletedMoveState(String targetLogDir) implements LogDirMoveState { @Override public boolean done() { return true; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CompletedMoveState that = (CompletedMoveState) o; - return Objects.equals(targetLogDir, that.targetLogDir); - } - - @Override - public int hashCode() { - return Objects.hash(targetLogDir); - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java index eb3f592841c..44982a8b712 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java @@ -17,36 +17,13 @@ package org.apache.kafka.tools.reassign; -import java.util.Objects; - /** * A replica log directory move state where the source replica is missing. + * @param targetLogDir The log directory that we wanted the replica to move to. */ -final class MissingLogDirMoveState implements LogDirMoveState { - public final String targetLogDir; - - /** - * @param targetLogDir The log directory that we wanted the replica to move to. - */ - public MissingLogDirMoveState(String targetLogDir) { - this.targetLogDir = targetLogDir; - } - +record MissingLogDirMoveState(String targetLogDir) implements LogDirMoveState { @Override public boolean done() { return false; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MissingLogDirMoveState that = (MissingLogDirMoveState) o; - return Objects.equals(targetLogDir, that.targetLogDir); - } - - @Override - public int hashCode() { - return Objects.hash(targetLogDir); - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java index eda9c22b829..b802275ceeb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java @@ -17,36 +17,13 @@ package org.apache.kafka.tools.reassign; -import java.util.Objects; - /** * A replica log directory move state where the source log directory is missing. + * @param targetLogDir The log directory that we wanted the replica to move to. */ -final class MissingReplicaMoveState implements LogDirMoveState { - public final String targetLogDir; - - /** - * @param targetLogDir The log directory that we wanted the replica to move to. - */ - public MissingReplicaMoveState(String targetLogDir) { - this.targetLogDir = targetLogDir; - } - +record MissingReplicaMoveState(String targetLogDir) implements LogDirMoveState { @Override public boolean done() { return false; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - MissingReplicaMoveState that = (MissingReplicaMoveState) o; - return Objects.equals(targetLogDir, that.targetLogDir); - } - - @Override - public int hashCode() { - return Objects.hash(targetLogDir); - } } diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 591c127ae2f..4628c34fe18 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -467,8 +467,8 @@ public class ReassignPartitionsCommand { bld.add("Partition " + replica.topic() + "-" + replica.partition() + " cannot be found " + "in any live log directory on broker " + replica.brokerId() + "."); } else if (state instanceof ActiveMoveState) { - String targetLogDir = ((ActiveMoveState) state).targetLogDir; - String futureLogDir = ((ActiveMoveState) state).futureLogDir; + String targetLogDir = ((ActiveMoveState) state).targetLogDir(); + String futureLogDir = ((ActiveMoveState) state).futureLogDir(); if (targetLogDir.equals(futureLogDir)) { bld.add("Reassignment of replica " + replica + " is still in progress."); } else { @@ -477,8 +477,8 @@ public class ReassignPartitionsCommand { "instead of " + targetLogDir + "."); } } else if (state instanceof CancelledMoveState) { - String targetLogDir = ((CancelledMoveState) state).targetLogDir; - String currentLogDir = ((CancelledMoveState) state).currentLogDir; + String targetLogDir = ((CancelledMoveState) state).targetLogDir(); + String currentLogDir = ((CancelledMoveState) state).currentLogDir(); bld.add("Partition " + replica.topic() + "-" + replica.partition() + " on broker " + replica.brokerId() + " is not being moved from log dir " + currentLogDir + " to " + targetLogDir + "."); @@ -1292,7 +1292,7 @@ public class ReassignPartitionsCommand { Map<TopicPartitionReplica, String> curMovingParts = new HashMap<>(); findLogDirMoveStates(adminClient, targetReplicas).forEach((part, moveState) -> { if (moveState instanceof ActiveMoveState) - curMovingParts.put(part, ((ActiveMoveState) moveState).currentLogDir); + curMovingParts.put(part, ((ActiveMoveState) moveState).currentLogDir()); }); if (curMovingParts.isEmpty()) { System.out.print("None of the specified partition moves are active."); diff --git a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java index 2b8666d57bc..d10cf7b2e45 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java @@ -444,13 +444,7 @@ public class ConnectPluginPathTest { MULTI_JAR } - private static class PluginLocation { - private final Path path; - - private PluginLocation(Path path) { - this.path = path; - } - + private record PluginLocation(Path path) { @Override public String toString() { return path.toString(); @@ -504,15 +498,7 @@ public class ConnectPluginPathTest { } } - private static class PluginPathElement { - private final Path root; - private final List<PluginLocation> locations; - - private PluginPathElement(Path root, List<PluginLocation> locations) { - this.root = root; - this.locations = locations; - } - + private record PluginPathElement(Path root, List<PluginLocation> locations) { @Override public String toString() { return root.toString(); @@ -535,14 +521,7 @@ public class ConnectPluginPathTest { return new PluginPathElement(path, locations); } - private static class WorkerConfig { - private final Path configFile; - private final List<PluginPathElement> pluginPathElements; - - private WorkerConfig(Path configFile, List<PluginPathElement> pluginPathElements) { - this.configFile = configFile; - this.pluginPathElements = pluginPathElements; - } + private record WorkerConfig(Path configFile, List<PluginPathElement> pluginPathElements) { @Override public String toString() { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index d5c1b035a44..236d1ce51cc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -183,13 +183,13 @@ public class ConsumerGroupServiceTest { Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results -> results.stream().collect(Collectors.toMap( - assignment -> new TopicPartition(assignment.topic.get(), assignment.partition.get()), - assignment -> assignment.offset)) + assignment -> new TopicPartition(assignment.topic().get(), assignment.partition().get()), + assignment -> assignment.offset())) ).orElse(Map.of()); Map<TopicPartition, Optional<Integer>> returnedLeaderEpoch = assignments.map(results -> results.stream().collect(Collectors.toMap( - assignment -> new TopicPartition(assignment.topic.get(), assignment.partition.get()), - assignment -> assignment.leaderEpoch)) + assignment -> new TopicPartition(assignment.topic().get(), assignment.partition().get()), + assignment -> assignment.leaderEpoch())) ).orElse(Map.of()); Map<TopicPartition, Optional<Long>> expectedOffsets = Map.of( diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index d30c8081440..2866027e232 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -322,7 +322,7 @@ public class DeleteConsumerGroupsTest { } private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, GroupState state) throws Exception { - return Objects.equals(service.collectGroupState(groupId).groupState, state); + return Objects.equals(service.collectGroupState(groupId).groupState(), state); } private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index b15b4fe9d45..9e5072576f4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -462,7 +462,7 @@ public class DescribeConsumerGroupTest { Optional<GroupState> state = groupOffsets.getKey(); Optional<Collection<PartitionAssignmentState>> assignments = groupOffsets.getValue(); - Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group); + Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group(), group); boolean res = state.map(s -> s.equals(GroupState.STABLE)).orElse(false) && assignments.isPresent() && @@ -477,9 +477,9 @@ public class DescribeConsumerGroupTest { PartitionAssignmentState partitionState = maybePartitionState.get(); - return !partitionState.consumerId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - !partitionState.clientId.map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - !partitionState.host.map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + return !partitionState.consumerId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + !partitionState.clientId().map(s0 -> s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + !partitionState.host().map(h -> h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + group + "."); } } @@ -506,7 +506,7 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); assertTrue(res.getValue().isPresent()); - assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1, + assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment().size() == 1, "Expected a topic partition assigned to the single group member for group " + group); } } @@ -526,10 +526,10 @@ public class DescribeConsumerGroupTest { ) { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.STABLE) && - state.numMembers == 1 && - state.coordinator != null && - clusterInstance.brokerIds().contains(state.coordinator.id()); + return Objects.equals(state.groupState(), GroupState.STABLE) && + state.numMembers() == 1 && + state.coordinator() != null && + clusterInstance.brokerIds().contains(state.coordinator().id()); }, "Expected a 'Stable' group status, with one member for group " + group + "."); } } @@ -558,11 +558,11 @@ public class DescribeConsumerGroupTest { try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})) { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.STABLE) && - state.numMembers == 1 && - Objects.equals(state.assignmentStrategy, expectedName) && - state.coordinator != null && - clusterInstance.brokerIds().contains(state.coordinator.id()); + return Objects.equals(state.groupState(), GroupState.STABLE) && + state.numMembers() == 1 && + Objects.equals(state.assignmentStrategy(), expectedName) && + state.coordinator() != null && + clusterInstance.brokerIds().contains(state.coordinator().id()); }, "Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + group + "."); } } finally { @@ -619,7 +619,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && assignment.offset.isPresent())).orElse(false); + && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group(), group) && assignment.offset().isPresent())).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); // stop the consumer so the group has no active member anymore @@ -629,13 +629,13 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(group); Optional<GroupState> state = offsets.getKey(); Optional<Collection<PartitionAssignmentState>> assignments = offsets.getValue(); - List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, group)).toList(); + List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group(), group)).toList(); PartitionAssignmentState assignment = testGroupAssignments.get(0); return state.map(s -> s.equals(GroupState.EMPTY)).orElse(false) && testGroupAssignments.size() == 1 && - assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone - assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignment.host.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + assignment.consumerId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone + assignment.clientId().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignment.host().map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); }, "failed to collect group offsets"); } } @@ -656,7 +656,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, group))).orElse(false); + && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group(), group))).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); // stop the consumer so the group has no active member anymore @@ -684,10 +684,10 @@ public class DescribeConsumerGroupTest { ) { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.STABLE) && - state.numMembers == 1 && - state.coordinator != null && - clusterInstance.brokerIds().contains(state.coordinator.id()); + return Objects.equals(state.groupState(), GroupState.STABLE) && + state.numMembers() == 1 && + state.coordinator() != null && + clusterInstance.brokerIds().contains(state.coordinator().id()); }, "Expected the group to initially become stable, and have a single member."); // stop the consumer so the group has no active member anymore @@ -695,7 +695,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.EMPTY) && state.numMembers == 0; + return Objects.equals(state.groupState(), GroupState.EMPTY) && state.numMembers() == 0; }, "Expected the group to become empty after the only member leaving."); } } @@ -744,8 +744,8 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).isPresent() && res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 1 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1; + res.getValue().get().stream().filter(s -> Objects.equals(s.group(), group)).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 1; }, "Expected rows for consumers with no assigned partitions in describe group results"); } } @@ -767,15 +767,15 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 0).count() == 1 && - res.getValue().get().stream().anyMatch(s -> !s.assignment.isEmpty()); + res.getValue().get().stream().filter(s -> Objects.equals(s.group(), group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group(), group) && x.numPartitions() == 0).count() == 1 && + res.getValue().get().stream().anyMatch(s -> !s.assignment().isEmpty()); }, "Expected rows for consumers with no assigned partitions in describe group results"); Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) - && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), + && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment().isEmpty())).orElse(false), "Expected additional columns in verbose version of describe members"); } } @@ -795,7 +795,7 @@ public class DescribeConsumerGroupTest { ) { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 2; + return Objects.equals(state.groupState(), GroupState.STABLE) && state.numMembers() == 2; }, "Expected two consumers in describe group results"); } } @@ -844,9 +844,9 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 && - res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.partition.isEmpty()); + res.getValue().get().stream().filter(s -> Objects.equals(s.group(), group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group(), group) && x.partition().isEmpty()); }, "Expected two rows (one row per consumer) in describe group results."); } } @@ -868,13 +868,13 @@ public class DescribeConsumerGroupTest { Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && - res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && - res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 && - res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.numPartitions == 0); + res.getValue().get().stream().filter(s -> Objects.equals(s.group(), group)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group(), group) && x.numPartitions() == 0); }, "Expected two rows (one row per consumer) in describe group members results."); Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group); - assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, + assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment().isEmpty()).count()).orElse(0L) == 0, "Expected additional columns in verbose version of describe members"); } } @@ -894,7 +894,7 @@ public class DescribeConsumerGroupTest { ) { TestUtils.waitForCondition(() -> { GroupInformation state = service.collectGroupState(group); - return Objects.equals(state.groupState, GroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2; + return Objects.equals(state.groupState(), GroupState.STABLE) && Objects.equals(state.group(), group) && state.numMembers() == 2; }, "Expected a stable group with two members in describe group state result."); } } @@ -915,7 +915,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group); return res.getKey().map(s -> s.equals(GroupState.EMPTY)).orElse(false) - && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2; + && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group(), group)).count() == 2; }, "Expected a stable group with two members in describe group state result."); } } @@ -1030,7 +1030,7 @@ public class DescribeConsumerGroupTest { TestUtils.waitForCondition(() -> { Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(group); - Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group); + Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group(), group); boolean res = groupOffsets.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && groupOffsets.getValue().isPresent() && @@ -1045,9 +1045,9 @@ public class DescribeConsumerGroupTest { PartitionAssignmentState assignmentState = maybeAssignmentState.get(); - return assignmentState.consumerId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignmentState.clientId.map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && - assignmentState.host.map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); + return assignmentState.consumerId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignmentState.clientId().map(c -> !c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && + assignmentState.host().map(h -> !h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false); }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + group + "."); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index 1c9ab9cf98b..bbcfb6e35c1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -896,10 +896,10 @@ public class ResetConsumerGroupOffsetTest { private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service, String group) throws Exception { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group).groupState; + GroupState state = service.collectGroupState(group).groupState(); return Objects.equals(state, GroupState.EMPTY) || Objects.equals(state, GroupState.DEAD); }, "Expected that consumer group is inactive. Actual state: " + - service.collectGroupState(group).groupState); + service.collectGroupState(group).groupState()); } private void resetAndAssertOffsetsCommitted(ClusterInstance cluster, diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index f1726721bfd..a8d830b4bd4 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -646,16 +646,7 @@ public class ReassignPartitionsCommandTest { })); } - static class LogDirReassignment { - final String json; - final String currentDir; - final String targetDir; - - public LogDirReassignment(String json, String currentDir, String targetDir) { - this.json = json; - this.currentDir = currentDir; - this.targetDir = targetDir; - } + record LogDirReassignment(String json, String currentDir, String targetDir) { } private LogDirReassignment buildLogDirReassignment(TopicPartition topicPartition,