This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5f59eac11be MINOR: Clean up TopicCommand (#21121)
5f59eac11be is described below
commit 5f59eac11be1d8201795695f41c8e18983be1952
Author: Dmitry Werner <[email protected]>
AuthorDate: Wed Jan 21 13:10:09 2026 +0500
MINOR: Clean up TopicCommand (#21121)
Changes in this PR:
- Cleared out obsolete boolean fields (always false) from classes.
- Revised method access modifiers for better encapsulation.
- Simplified code by inlining one‑time‑use methods.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/tools/TopicCommand.java | 194 +++++++++------------
.../org/apache/kafka/tools/TopicCommandTest.java | 2 +-
2 files changed, 79 insertions(+), 117 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
index 3de4a8dc518..1d1d5c2f72c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
@@ -154,7 +154,6 @@ public abstract class TopicCommand {
return ret;
}
- @SuppressWarnings("deprecation")
private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions
opts) {
List<List<String>> configsToBeAdded =
opts.topicConfig().orElse(List.of())
.stream()
@@ -248,7 +247,7 @@ public abstract class TopicCommand {
private final TopicCommandOptions opts;
- public CommandTopicPartition(TopicCommandOptions options) {
+ CommandTopicPartition(TopicCommandOptions options) {
opts = options;
name = options.topic().get();
partitions = options.partitions();
@@ -257,11 +256,11 @@ public abstract class TopicCommand {
configsToAdd = parseTopicConfigsToBeAdded(options);
}
- public boolean hasReplicaAssignment() {
+ boolean hasReplicaAssignment() {
return !replicaAssignment.isEmpty();
}
- public boolean ifTopicDoesntExist() {
+ boolean ifTopicDoesntExist() {
return opts.ifNotExists();
}
}
@@ -272,18 +271,16 @@ public abstract class TopicCommand {
private final int numPartitions;
private final int replicationFactor;
private final Config config;
- private final boolean markedForDeletion;
- public TopicDescription(String topic, Uuid topicId, int numPartitions,
int replicationFactor, Config config, boolean markedForDeletion) {
+ TopicDescription(String topic, Uuid topicId, int numPartitions, int
replicationFactor, Config config) {
this.topic = topic;
this.topicId = topicId;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
this.config = config;
- this.markedForDeletion = markedForDeletion;
}
- public void printDescription() {
+ void printDescription() {
String configsAsString = config.entries().stream()
.filter(config -> !config.isDefault())
.map(ce -> ce.name() + "=" + ce.value())
@@ -294,7 +291,6 @@ public abstract class TopicCommand {
System.out.print("\tPartitionCount: " + numPartitions);
System.out.print("\tReplicationFactor: " + replicationFactor);
System.out.print("\tConfigs: " + configsAsString);
- System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" :
"");
System.out.println();
}
}
@@ -303,46 +299,43 @@ public abstract class TopicCommand {
private final String topic;
private final TopicPartitionInfo info;
private final Config config;
- private final boolean markedForDeletion;
private final PartitionReassignment reassignment;
PartitionDescription(String topic,
TopicPartitionInfo info,
Config config,
- boolean markedForDeletion,
PartitionReassignment reassignment) {
this.topic = topic;
this.info = info;
this.config = config;
- this.markedForDeletion = markedForDeletion;
this.reassignment = reassignment;
}
- public int minIsrCount() {
+ int minIsrCount() {
return
Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
}
- public boolean isUnderReplicated() {
+ boolean isUnderReplicated() {
return getReplicationFactor(info, reassignment) -
info.isr().size() > 0;
}
- public boolean hasLeader() {
+ boolean hasLeader() {
return info.leader() != null;
}
- public boolean isUnderMinIsr() {
- return !hasLeader() || info.isr().size() < minIsrCount();
+ boolean isUnderMinIsr() {
+ return !hasLeader() || info.isr().size() < minIsrCount();
}
- public boolean isAtMinIsrPartitions() {
+ boolean isAtMinIsrPartitions() {
return minIsrCount() == info.isr().size();
}
- public boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
+ boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
return !hasLeader() || !liveBrokers.contains(info.leader().id());
}
- public void printDescription() {
+ void printDescription() {
System.out.print("\tTopic: " + topic);
System.out.print("\tPartition: " + info.partition());
System.out.print("\tLeader: " + (hasLeader() ? info.leader().id()
: "none"));
@@ -376,7 +369,6 @@ public abstract class TopicCommand {
} else {
System.out.print("\tLastKnownElr: N/A");
}
- System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" :
"");
System.out.println();
}
}
@@ -387,7 +379,7 @@ public abstract class TopicCommand {
private final boolean describeConfigs;
private final boolean describePartitions;
- public DescribeOptions(TopicCommandOptions opts, Set<Integer>
liveBrokers) {
+ DescribeOptions(TopicCommandOptions opts, Set<Integer> liveBrokers) {
this.opts = opts;
this.liveBrokers = liveBrokers;
this.describeConfigs = !opts.reportUnavailablePartitions() &&
@@ -421,30 +413,27 @@ public abstract class TopicCommand {
shouldPrintAtMinIsrPartitions(partitionDesc);
}
- public void maybePrintPartitionDescription(PartitionDescription desc) {
+ void maybePrintPartitionDescription(PartitionDescription desc) {
if (shouldPrintTopicPartition(desc)) {
desc.printDescription();
}
}
}
- public static class TopicService implements AutoCloseable {
+ static class TopicService implements AutoCloseable {
private final Admin adminClient;
- public TopicService(Properties commandConfig, Optional<String>
bootstrapServer) {
- this.adminClient = createAdminClient(commandConfig,
bootstrapServer);
- }
+ TopicService(Properties commandConfig, Optional<String>
bootstrapServer) {
+ bootstrapServer.ifPresent(s ->
commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s));
- public TopicService(Admin admin) {
- this.adminClient = admin;
+ adminClient = Admin.create(commandConfig);
}
- private static Admin createAdminClient(Properties commandConfig,
Optional<String> bootstrapServer) {
- bootstrapServer.ifPresent(s ->
commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s));
- return Admin.create(commandConfig);
+ TopicService(Admin admin) {
+ adminClient = admin;
}
- public void createTopic(TopicCommandOptions opts) throws Exception {
+ void createTopic(TopicCommandOptions opts) throws Exception {
CommandTopicPartition topic = new CommandTopicPartition(opts);
if (Topic.hasCollisionChars(topic.name)) {
System.out.println("WARNING: Due to limitations in metric
names, topics with a period ('.') or underscore ('_') could " +
@@ -453,7 +442,7 @@ public abstract class TopicCommand {
createTopic(topic);
}
- public void createTopic(CommandTopicPartition topic) throws Exception {
+ private void createTopic(CommandTopicPartition topic) throws Exception
{
if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE ||
rf < 1).isPresent()) {
throw new IllegalArgumentException("The replication factor
must be between 1 and " + Short.MAX_VALUE + " inclusive");
}
@@ -462,12 +451,9 @@ public abstract class TopicCommand {
}
try {
- NewTopic newTopic;
- if (topic.hasReplicaAssignment()) {
- newTopic = new NewTopic(topic.name,
topic.replicaAssignment);
- } else {
- newTopic = new NewTopic(topic.name, topic.partitions,
topic.replicationFactor.map(Integer::shortValue));
- }
+ NewTopic newTopic = topic.hasReplicaAssignment()
+ ? new NewTopic(topic.name, topic.replicaAssignment)
+ : new NewTopic(topic.name, topic.partitions,
topic.replicationFactor.map(Integer::shortValue));
Map<String, String> configsMap =
topic.configsToAdd.stringPropertyNames().stream()
.collect(Collectors.toMap(name -> name,
topic.configsToAdd::getProperty));
@@ -487,12 +473,12 @@ public abstract class TopicCommand {
}
}
- public void listTopics(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
+ void listTopics(TopicCommandOptions opts) throws ExecutionException,
InterruptedException {
String results = String.join("\n", getTopics(opts.topic(),
opts.excludeInternalTopics()));
System.out.println(results);
}
- public void alterTopic(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
+ void alterTopic(TopicCommandOptions opts) throws ExecutionException,
InterruptedException {
CommandTopicPartition topic = new CommandTopicPartition(opts);
List<String> topics = getTopics(opts.topic(),
opts.excludeInternalTopics());
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
@@ -525,7 +511,7 @@ public abstract class TopicCommand {
return new AbstractMap.SimpleEntry<>(topicName,
NewPartitions.increaseTo(topic.partitions.get()));
}
- public Map<TopicPartition, PartitionReassignment>
listAllReassignments(Set<TopicPartition> topicPartitions) {
+ Map<TopicPartition, PartitionReassignment>
listAllReassignments(Set<TopicPartition> topicPartitions) {
try {
return
adminClient.listPartitionReassignments(topicPartitions).reassignments().get();
} catch (ExecutionException e) {
@@ -541,7 +527,7 @@ public abstract class TopicCommand {
}
}
- public void describeTopic(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
+ void describeTopic(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
// If topicId is provided and not zero, will use topicId
regardless of topic name
Optional<Uuid> inputTopicId = opts.topicId()
.map(Uuid::fromString).filter(uuid ->
!uuid.equals(Uuid.ZERO_UUID));
@@ -599,45 +585,46 @@ public abstract class TopicCommand {
Map<TopicPartition, PartitionReassignment> reassignments =
listAllReassignments(topicPartitions);
for (org.apache.kafka.clients.admin.TopicDescription td :
topicDescriptions) {
String topicName = td.name();
- Uuid topicId = td.topicId();
Config config = allConfigs.get(new
ConfigResource(ConfigResource.Type.TOPIC, topicName)).get();
ArrayList<TopicPartitionInfo> sortedPartitions = new
ArrayList<>(td.partitions());
sortedPartitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
- printDescribeConfig(opts, describeOptions, reassignments, td,
topicName, topicId, config, sortedPartitions);
+ printDescribeConfig(opts, describeOptions, reassignments, td,
topicName, td.topicId(), config, sortedPartitions);
printPartitionDescription(describeOptions, reassignments, td,
topicName, config, sortedPartitions);
}
}
- private void printPartitionDescription(DescribeOptions
describeOptions, Map<TopicPartition, PartitionReassignment> reassignments,
org.apache.kafka.clients.admin.TopicDescription td, String topicName, Config
config, ArrayList<TopicPartitionInfo> sortedPartitions) {
+ private void printPartitionDescription(DescribeOptions
describeOptions, Map<TopicPartition, PartitionReassignment> reassignments,
+
org.apache.kafka.clients.admin.TopicDescription td, String topicName, Config
config,
+ ArrayList<TopicPartitionInfo>
sortedPartitions) {
if (describeOptions.describePartitions) {
for (TopicPartitionInfo partition : sortedPartitions) {
PartitionReassignment reassignment =
reassignments.get(new TopicPartition(td.name(),
partition.partition()));
PartitionDescription partitionDesc = new
PartitionDescription(topicName,
- partition, config, false, reassignment);
+ partition, config, reassignment);
describeOptions.maybePrintPartitionDescription(partitionDesc);
}
}
}
- private void printDescribeConfig(TopicCommandOptions opts,
DescribeOptions describeOptions, Map<TopicPartition, PartitionReassignment>
reassignments, org.apache.kafka.clients.admin.TopicDescription td, String
topicName, Uuid topicId, Config config, ArrayList<TopicPartitionInfo>
sortedPartitions) {
+ private void printDescribeConfig(TopicCommandOptions opts,
DescribeOptions describeOptions, Map<TopicPartition, PartitionReassignment>
reassignments,
+
org.apache.kafka.clients.admin.TopicDescription td, String topicName, Uuid
topicId,
+ Config config,
ArrayList<TopicPartitionInfo> sortedPartitions) {
if (describeOptions.describeConfigs) {
List<ConfigEntry> entries = new ArrayList<>(config.entries());
boolean hasNonDefault = entries.stream().anyMatch(e ->
!e.isDefault());
if (!opts.reportOverriddenConfigs() || hasNonDefault) {
- int numPartitions = td.partitions().size();
TopicPartitionInfo firstPartition =
sortedPartitions.get(0);
PartitionReassignment reassignment =
reassignments.get(new TopicPartition(td.name(),
firstPartition.partition()));
TopicDescription topicDesc = new
TopicDescription(topicName, topicId,
- numPartitions, getReplicationFactor(firstPartition,
reassignment),
- config, false);
+ td.partitions().size(),
getReplicationFactor(firstPartition, reassignment), config);
topicDesc.printDescription();
}
}
}
- public void deleteTopic(TopicCommandOptions opts) throws
ExecutionException, InterruptedException {
+ void deleteTopic(TopicCommandOptions opts) throws ExecutionException,
InterruptedException {
List<String> topics = getTopics(opts.topic(),
opts.excludeInternalTopics());
ensureTopicExists(topics, opts.topic(), !opts.ifExists());
adminClient.deleteTopics(List.copyOf(topics),
@@ -645,7 +632,7 @@ public abstract class TopicCommand {
).all().get();
}
- public List<String> getTopics(Optional<String> topicIncludeList,
boolean excludeInternalTopics) throws ExecutionException, InterruptedException {
+ List<String> getTopics(Optional<String> topicIncludeList, boolean
excludeInternalTopics) throws ExecutionException, InterruptedException {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
if (!excludeInternalTopics) {
listTopicsOptions.listInternal(true);
@@ -655,7 +642,7 @@ public abstract class TopicCommand {
return
doGetTopics(allTopics.stream().sorted().collect(Collectors.toList()),
topicIncludeList, excludeInternalTopics);
}
- public List<Uuid> getTopicIds(Uuid topicIdIncludeList, boolean
excludeInternalTopics) throws ExecutionException, InterruptedException {
+ private List<Uuid> getTopicIds(Uuid topicIdIncludeList, boolean
excludeInternalTopics) throws ExecutionException, InterruptedException {
ListTopicsResult allTopics = excludeInternalTopics ?
adminClient.listTopics() :
adminClient.listTopics(new
ListTopicsOptions().listInternal(true));
List<Uuid> allTopicIds = allTopics.listings().get().stream()
@@ -673,7 +660,7 @@ public abstract class TopicCommand {
}
}
- public static final class TopicCommandOptions extends
CommandDefaultOptions {
+ static final class TopicCommandOptions extends CommandDefaultOptions {
private final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
private final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
@@ -692,8 +679,6 @@ public abstract class TopicCommand {
private final ArgumentAcceptingOptionSpec<String> topicIdOpt;
- private final String nl;
-
private static final String
KAFKA_CONFIGS_CLI_SUPPORTS_ALTERING_TOPIC_CONFIGS =
" (To alter topic configurations, the kafka-configs tool can
be used.)";
@@ -733,7 +718,7 @@ public abstract class TopicCommand {
private final Set<OptionSpecBuilder> allReplicationReportOpts;
- public TopicCommandOptions(String[] args) {
+ TopicCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The Kafka server to connect to.")
.withRequiredArg()
@@ -759,7 +744,7 @@ public abstract class TopicCommand {
.withRequiredArg()
.describedAs("topic-id")
.ofType(String.class);
- nl = System.lineSeparator();
+ String nl = System.lineSeparator();
String logConfigNames =
LogConfig.nonInternalConfigNames().stream().map(config -> "\t" +
config).collect(Collectors.joining(nl));
configOpt = parser.accepts("config", "A topic configuration
override for the topic being created." +
@@ -818,135 +803,113 @@ public abstract class TopicCommand {
checkArgs();
}
- public boolean has(OptionSpec<?> builder) {
+ boolean has(OptionSpec<?> builder) {
return options.has(builder);
}
- public <A> Optional<A> valueAsOption(OptionSpec<A> option) {
- return valueAsOption(option, Optional.empty());
- }
-
- public <A> Optional<List<A>> valuesAsOption(OptionSpec<A> option) {
- return valuesAsOption(option, List.of());
- }
-
- public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A>
defaultValue) {
- if (has(option)) {
- return Optional.of(options.valueOf(option));
- } else {
- return defaultValue;
- }
- }
-
- public <A> Optional<List<A>> valuesAsOption(OptionSpec<A> option,
List<A> defaultValue) {
- return options.has(option) ? Optional.of(options.valuesOf(option))
: Optional.of(defaultValue);
+ <A> Optional<A> valueAsOption(OptionSpec<A> option) {
+ return has(option) ? Optional.of(options.valueOf(option)) :
Optional.empty();
}
- public boolean hasCreateOption() {
+ boolean hasCreateOption() {
return has(createOpt);
}
- public boolean hasAlterOption() {
+ boolean hasAlterOption() {
return has(alterOpt);
}
- public boolean hasListOption() {
+ boolean hasListOption() {
return has(listOpt);
}
- public boolean hasDescribeOption() {
+ boolean hasDescribeOption() {
return has(describeOpt);
}
- public boolean hasDeleteOption() {
+ boolean hasDeleteOption() {
return has(deleteOpt);
}
- public Optional<String> bootstrapServer() {
+ Optional<String> bootstrapServer() {
return valueAsOption(bootstrapServerOpt);
}
- public Properties commandConfig() throws IOException {
- if (has(commandConfigOpt)) {
- return Utils.loadProps(options.valueOf(commandConfigOpt));
- } else {
- return new Properties();
- }
+ Properties commandConfig() throws IOException {
+ return has(commandConfigOpt) ?
Utils.loadProps(options.valueOf(commandConfigOpt)) : new Properties();
}
- public Optional<String> topic() {
+ Optional<String> topic() {
return valueAsOption(topicOpt);
}
- public Optional<String> topicId() {
+ Optional<String> topicId() {
return valueAsOption(topicIdOpt);
}
- public Optional<Integer> partitions() {
+ Optional<Integer> partitions() {
return valueAsOption(partitionsOpt);
}
- public Optional<Integer> replicationFactor() {
+ Optional<Integer> replicationFactor() {
return valueAsOption(replicationFactorOpt);
}
- public Optional<Map<Integer, List<Integer>>> replicaAssignment() {
- if (has(replicaAssignmentOpt) &&
!Optional.of(options.valueOf(replicaAssignmentOpt)).orElse("").isEmpty())
- return
Optional.of(parseReplicaAssignment(options.valueOf(replicaAssignmentOpt)));
- else
- return Optional.empty();
+ Optional<Map<Integer, List<Integer>>> replicaAssignment() {
+ return has(replicaAssignmentOpt) &&
!Optional.of(options.valueOf(replicaAssignmentOpt)).orElse("").isEmpty()
+ ?
Optional.of(parseReplicaAssignment(options.valueOf(replicaAssignmentOpt)))
+ : Optional.empty();
}
- public boolean reportUnderReplicatedPartitions() {
+ boolean reportUnderReplicatedPartitions() {
return has(reportUnderReplicatedPartitionsOpt);
}
- public boolean reportUnavailablePartitions() {
+ boolean reportUnavailablePartitions() {
return has(reportUnavailablePartitionsOpt);
}
- public boolean reportUnderMinIsrPartitions() {
+ boolean reportUnderMinIsrPartitions() {
return has(reportUnderMinIsrPartitionsOpt);
}
- public boolean reportAtMinIsrPartitions() {
+ boolean reportAtMinIsrPartitions() {
return has(reportAtMinIsrPartitionsOpt);
}
- public boolean reportOverriddenConfigs() {
+ boolean reportOverriddenConfigs() {
return has(topicsWithOverridesOpt);
}
- public boolean ifExists() {
+ boolean ifExists() {
return has(ifExistsOpt);
}
- public boolean ifNotExists() {
+ boolean ifNotExists() {
return has(ifNotExistsOpt);
}
- public boolean excludeInternalTopics() {
+ boolean excludeInternalTopics() {
return has(excludeInternalTopicOpt);
}
- public Optional<Integer> partitionSizeLimitPerResponse() {
+ Optional<Integer> partitionSizeLimitPerResponse() {
return valueAsOption(partitionSizeLimitPerResponseOpt);
}
- public Optional<List<String>> topicConfig() {
- return valuesAsOption(configOpt);
+ Optional<List<String>> topicConfig() {
+ return options.has(configOpt) ?
Optional.of(options.valuesOf(configOpt)) : Optional.of(List.of());
}
- public void checkArgs() {
+ private void checkArgs() {
if (args.length == 0)
CommandLineUtils.printUsageAndExit(parser, "Create, delete,
describe, or change a topic.");
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
create, delete, describe, or change a topic.");
// should have exactly one action
- long actions =
- Stream.of(createOpt, listOpt, alterOpt, describeOpt,
deleteOpt).filter(options::has)
- .count();
+ long actions = Stream.of(createOpt, listOpt, alterOpt,
describeOpt, deleteOpt).filter(options::has).count();
+
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must
include exactly one action: --list, --describe, --create, --alter or --delete");
@@ -988,7 +951,6 @@ public abstract class TopicCommand {
CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, partitionsOpt, replicationFactorOpt);
}
-
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderReplicatedPartitionsOpt,
invalidOptions(Set.of(topicsWithOverridesOpt),
List.of(describeOpt, reportUnderReplicatedPartitionsOpt)));
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderMinIsrPartitionsOpt,
diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
index 57a8269f089..a1dbffcab1d 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandTest.java
@@ -119,7 +119,7 @@ public class TopicCommandTest {
TopicCommand.PartitionDescription partitionDescription = new
TopicCommand.PartitionDescription("test-topic",
new TopicPartitionInfo(0, new Node(1, "localhost", 9091),
replicas,
List.of(new Node(1, "localhost", 9091))),
- null, false,
+ null,
new PartitionReassignment(replicaIds, List.of(2), List.of())
);