This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 72cfc994f56 KAFKA-14628: Move CommandLineUtils and
CommandDefaultOptions to tools (#13131)
72cfc994f56 is described below
commit 72cfc994f5675be349d4494ece3528efed290651
Author: Federico Valeri <[email protected]>
AuthorDate: Thu Jan 26 20:06:09 2023 +0100
KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions to tools
(#13131)
Reviewers: Mickael Maison <[email protected]>, Christo Lolov
<[email protected]>, Sagar Rao <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control-core.xml | 3 +-
checkstyle/import-control.xml | 2 +
core/src/main/scala/kafka/Kafka.scala | 13 +-
core/src/main/scala/kafka/admin/AclCommand.scala | 44 ++--
.../kafka/admin/BrokerApiVersionsCommand.scala | 5 +-
.../src/main/scala/kafka/admin/ConfigCommand.scala | 11 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 64 +++---
.../scala/kafka/admin/DelegationTokenCommand.scala | 23 ++-
.../scala/kafka/admin/DeleteRecordsCommand.scala | 6 +-
.../scala/kafka/admin/LeaderElectionCommand.scala | 6 +-
.../main/scala/kafka/admin/LogDirsCommand.scala | 7 +-
.../kafka/admin/ReassignPartitionsCommand.scala | 11 +-
core/src/main/scala/kafka/admin/TopicCommand.scala | 46 +++--
.../scala/kafka/admin/ZkSecurityMigrator.scala | 7 +-
.../main/scala/kafka/tools/ConsoleConsumer.scala | 23 ++-
.../main/scala/kafka/tools/ConsoleProducer.scala | 13 +-
.../scala/kafka/tools/ConsumerPerformance.scala | 8 +-
.../main/scala/kafka/tools/DumpLogSegments.scala | 3 +-
.../main/scala/kafka/tools/GetOffsetShell.scala | 5 +-
core/src/main/scala/kafka/tools/JmxTool.scala | 8 +-
core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +-
core/src/main/scala/kafka/tools/PerfConfig.scala | 3 +-
.../kafka/tools/ReplicaVerificationTool.scala | 5 +-
.../scala/kafka/tools/StateChangeLogMerger.scala | 6 +-
.../main/scala/kafka/tools/StreamsResetter.java | 11 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 7 +-
.../scala/kafka/utils/CommandDefaultOptions.scala | 27 ---
.../main/scala/kafka/utils/CommandLineUtils.scala | 145 -------------
core/src/main/scala/kafka/utils/ToolsUtils.scala | 19 +-
.../scala/kafka/tools/LogCompactionTester.scala | 3 +-
.../scala/other/kafka/TestLinearWriteSpeed.scala | 1 +
.../other/kafka/TestPurgatoryPerformance.scala | 1 +
.../admin/ReassignPartitionsCommandArgsTest.scala | 2 +-
.../unit/kafka/utils/CommandLineUtilsTest.scala | 223 --------------------
.../kafka/server/util/CommandDefaultOptions.java | 41 ++++
.../apache/kafka/server/util/CommandLineUtils.java | 197 ++++++++++++++++++
.../kafka/server/util/CommandLineUtilsTest.java | 227 +++++++++++++++++++++
38 files changed, 664 insertions(+), 567 deletions(-)
diff --git a/build.gradle b/build.gradle
index 8441ddeaf05..56883860573 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1536,6 +1536,7 @@ project(':server-common') {
api project(':clients')
implementation libs.slf4jApi
implementation libs.metrics
+ implementation libs.joptSimple
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control-core.xml
b/checkstyle/import-control-core.xml
index 5a92c44d7c8..0b15f958b96 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -60,8 +60,9 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="kafka.admin" />
- <allow pkg="joptsimple" />
<allow pkg="org.apache.kafka.clients.consumer" />
+ <allow pkg="org.apache.kafka.server.util" />
+ <allow pkg="joptsimple" />
</subpackage>
<subpackage name="coordinator">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8792c26c949..0e767baee5d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -347,6 +347,7 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
+ <allow pkg="joptsimple" />
<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />
@@ -406,6 +407,7 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.log4j" />
<allow pkg="kafka.test" />
+ <allow pkg="joptsimple" />
</subpackage>
<subpackage name="trogdor">
diff --git a/core/src/main/scala/kafka/Kafka.scala
b/core/src/main/scala/kafka/Kafka.scala
index dad462dcad6..fa0137e9597 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -22,10 +22,9 @@ import java.util.Properties
import joptsimple.OptionParser
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler,
OperatingSystem, Time, Utils}
-
-import scala.jdk.CollectionConverters._
+import org.apache.kafka.server.util.CommandLineUtils
object Kafka extends Logging {
@@ -41,12 +40,12 @@ object Kafka extends Logging {
optionParser.accepts("version", "Print version information and exit.")
if (args.isEmpty || args.contains("--help")) {
- CommandLineUtils.printUsageAndDie(optionParser,
+ CommandLineUtils.printUsageAndExit(optionParser,
"USAGE: java [options] %s server.properties [--override
property=value]*".format(this.getClass.getCanonicalName.split('$').head))
}
if (args.contains("--version")) {
- CommandLineUtils.printVersionAndDie()
+ CommandLineUtils.printVersionAndExit()
}
val props = Utils.loadProps(args(0))
@@ -55,10 +54,10 @@ object Kafka extends Logging {
val options = optionParser.parse(args.slice(1, args.length): _*)
if (options.nonOptionArguments().size() > 0) {
- CommandLineUtils.printUsageAndDie(optionParser, "Found non argument
parameters: " + options.nonOptionArguments().toArray.mkString(","))
+ CommandLineUtils.printUsageAndExit(optionParser, "Found non argument
parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
- props ++=
CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
+ props ++=
CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt))
}
props
}
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 769e99df737..1c8b6537346 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -18,7 +18,6 @@
package kafka.admin
import java.util.Properties
-
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
@@ -33,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -51,7 +51,7 @@ object AclCommand extends Logging {
val opts = new AclCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manage
acls on kafka.")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage
acls on kafka.")
opts.checkArgs()
@@ -202,8 +202,8 @@ object AclCommand extends Logging {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp ->
JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
- val authorizerProperties =
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
- defaultProps ++
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue =
false).asScala
+ val authorizerProperties =
opts.options.valuesOf(opts.authorizerPropertiesOpt)
+ defaultProps ++
CommandLineUtils.parseKeyValueArgs(authorizerProperties, false).asScala
} else {
defaultProps
}
@@ -324,7 +324,7 @@ object AclCommand extends Logging {
private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern,
Set[AccessControlEntry]] = {
val patternType = opts.options.valueOf(opts.resourcePatternType)
if (!patternType.isSpecific)
- CommandLineUtils.printUsageAndDie(opts.parser, s"A
'--resource-pattern-type' value of '$patternType' is not valid when adding
acls.")
+ CommandLineUtils.printUsageAndExit(opts.parser, s"A
'--resource-pattern-type' value of '$patternType' is not valid when adding
acls.")
val resourceToAcl = getResourceFilterToAcls(opts).map {
case (filter, acls) =>
@@ -332,7 +332,7 @@ object AclCommand extends Logging {
}
if (resourceToAcl.values.exists(_.isEmpty))
- CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of:
--allow-principal, --deny-principal when trying to add ACLs.")
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one
of: --allow-principal, --deny-principal when trying to add ACLs.")
resourceToAcl
}
@@ -430,8 +430,8 @@ object AclCommand extends Logging {
} yield new AccessControlEntry(principal.toString, host, operation,
permissionType)
}
- private def getHosts(opts: AclCommandOptions, hostOptionSpec:
ArgumentAcceptingOptionSpec[String],
- principalOptionSpec:
ArgumentAcceptingOptionSpec[String]): Set[String] = {
+ private def getHosts(opts: AclCommandOptions, hostOptionSpec:
OptionSpec[String],
+ principalOptionSpec: OptionSpec[String]): Set[String] =
{
if (opts.options.has(hostOptionSpec))
opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
else if (opts.options.has(principalOptionSpec))
@@ -440,7 +440,7 @@ object AclCommand extends Logging {
Set.empty[String]
}
- private def getPrincipals(opts: AclCommandOptions, principalOptionSpec:
ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
+ private def getPrincipals(opts: AclCommandOptions, principalOptionSpec:
OptionSpec[String]): Set[KafkaPrincipal] = {
if (opts.options.has(principalOptionSpec))
opts.options.valuesOf(principalOptionSpec).asScala.map(s =>
JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
else
@@ -471,7 +471,7 @@ object AclCommand extends Logging {
opts.options.valuesOf(opts.userPrincipalOpt).forEach(user =>
resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim,
patternType))
if (resourceFilters.isEmpty && dieIfNoResourceFound)
- CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at
least one resource: --topic <topic> or --cluster or --group <group> or
--delegation-token <Delegation Token ID>")
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at
least one resource: --topic <topic> or --cluster or --group <group> or
--delegation-token <Delegation Token ID>")
resourceFilters
}
@@ -487,7 +487,7 @@ object AclCommand extends Logging {
for ((resource, acls) <- resourceToAcls) {
val validOps = AclEntry.supportedOperations(resource.resourceType) +
AclOperation.ALL
if ((acls.map(_.operation) -- validOps).nonEmpty)
- CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType
${resource.resourceType} only supports operations ${validOps.mkString(",")}")
+ CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType
${resource.resourceType} only supports operations ${validOps.mkString(",")}")
}
}
@@ -634,7 +634,7 @@ object AclCommand extends Logging {
def checkArgs(): Unit = {
if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
- CommandLineUtils.printUsageAndDie(parser, "Only one of
--bootstrap-server or --authorizer must be specified")
+ CommandLineUtils.printUsageAndExit(parser, "Only one of
--bootstrap-server or --authorizer must be specified")
if (!options.has(bootstrapServerOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options,
authorizerPropertiesOpt)
@@ -642,32 +642,32 @@ object AclCommand extends Logging {
}
if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
- CommandLineUtils.printUsageAndDie(parser, "The --command-config option
can only be used with --bootstrap-server option")
+ CommandLineUtils.printUsageAndExit(parser, "The --command-config
option can only be used with --bootstrap-server option")
if (options.has(authorizerPropertiesOpt) &&
options.has(bootstrapServerOpt))
- CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties
option can only be used with --authorizer option")
+ CommandLineUtils.printUsageAndExit(parser, "The
--authorizer-properties option can only be used with --authorizer option")
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
if (actions != 1)
- CommandLineUtils.printUsageAndDie(parser, "Command must include
exactly one action: --list, --add, --remove. ")
+ CommandLineUtils.printUsageAndExit(parser, "Command must include
exactly one action: --list, --add, --remove. ")
- CommandLineUtils.checkInvalidArgs(parser, options, listOpt,
Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt,
denyPrincipalsOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt,
consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
//when --producer or --consumer is specified , user should not specify
operations as they are inferred and we also disallow --deny-principals and
--deny-hosts.
- CommandLineUtils.checkInvalidArgs(parser, options, producerOpt,
Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, producerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
if (options.has(listPrincipalsOpt) && !options.has(listOpt))
- CommandLineUtils.printUsageAndDie(parser, "The --principal option is
only available if --list is set")
+ CommandLineUtils.printUsageAndExit(parser, "The --principal option is
only available if --list is set")
if (options.has(producerOpt) && !options.has(topicOpt))
- CommandLineUtils.printUsageAndDie(parser, "With --producer you must
specify a --topic")
+ CommandLineUtils.printUsageAndExit(parser, "With --producer you must
specify a --topic")
if (options.has(idempotentOpt) && !options.has(producerOpt))
- CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is
only available if --producer is set")
+ CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is
only available if --producer is set")
if (options.has(consumerOpt) && (!options.has(topicOpt) ||
!options.has(groupOpt) || (!options.has(producerOpt) &&
(options.has(clusterOpt) || options.has(transactionalIdOpt)))))
- CommandLineUtils.printUsageAndDie(parser, "With --consumer you must
specify a --topic and a --group and no --cluster or --transactional-id option
should be specified.")
+ CommandLineUtils.printUsageAndExit(parser, "With --consumer you must
specify a --topic and a --group and no --cluster or --transactional-id option
should be specified.")
}
}
}
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ea6ffeea65b..990ac6f9448 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -22,8 +22,6 @@ import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
-
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
@@ -42,6 +40,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.Node
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@@ -94,7 +93,7 @@ object BrokerApiVersionsCommand {
checkArgs()
def checkArgs(): Unit = {
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to
retrieve broker version information.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
retrieve broker version information.")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 687e65378b9..5d915f5e97f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,7 +23,7 @@ import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{ConfigEntityName, ConfigType, Defaults,
DynamicBrokerConfig, DynamicConfig, KafkaConfig}
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging,
PasswordEncoder}
+import kafka.utils.{Exit, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo,
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig,
ScramMechanism => PublicScramMechanism}
@@ -37,6 +37,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils,
ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
@@ -84,7 +85,7 @@ object ConfigCommand extends Logging {
try {
val opts = new ConfigCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to
manipulate and describe entity config for a topic, client, user, broker or ip")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
manipulate and describe entity config for a topic, client, user, broker or ip")
opts.checkArgs()
@@ -863,10 +864,10 @@ object ConfigCommand extends Logging {
// should have exactly one action
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if (actions != 1)
- CommandLineUtils.printUsageAndDie(parser, "Command must include
exactly one action: --describe, --alter")
+ CommandLineUtils.printUsageAndExit(parser, "Command must include
exactly one action: --describe, --alter")
// check required args
- CommandLineUtils.checkInvalidArgs(parser, options, alterOpt,
Set(describeOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, describeOpt,
Set(alterOpt, addConfig, deleteConfig))
+ CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, describeOpt,
alterOpt, addConfig, deleteConfig)
val entityTypeVals = entityTypes
if (entityTypeVals.size != entityTypeVals.distinct.size)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b428c447d77..9c0452b781c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -28,18 +28,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, immutable, mutable}
import scala.util.{Failure, Success, Try}
-import joptsimple.OptionSpec
+import joptsimple.{OptionException, OptionSpec}
import org.apache.kafka.common.protocol.Errors
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
import org.apache.kafka.common.ConsumerGroupState
-import joptsimple.OptionException
import org.apache.kafka.common.requests.ListOffsetsResponse
object ConsumerGroupCommand extends Logging {
@@ -49,17 +49,17 @@ object ConsumerGroupCommand extends Logging {
val opts = new ConsumerGroupCommandOptions(args)
try {
opts.checkArgs()
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list
all consumer groups, describe a consumer group, delete consumer group info, or
reset consumer group offsets.")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list
all consumer groups, describe a consumer group, delete consumer group info, or
reset consumer group offsets.")
// should have exactly one action
val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt,
opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
if (actions != 1)
- CommandLineUtils.printUsageAndDie(opts.parser, "Command must include
exactly one action: --list, --describe, --delete, --reset-offsets,
--delete-offsets")
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must include
exactly one action: --list, --describe, --delete, --reset-offsets,
--delete-offsets")
run(opts)
} catch {
case e: OptionException =>
- CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+ CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
}
}
@@ -85,7 +85,7 @@ object ConsumerGroupCommand extends Logging {
}
} catch {
case e: IllegalArgumentException =>
- CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+ CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
case e: Throwable =>
printError(s"Executing consumer group command failed due to
${e.getMessage}", Some(e))
} finally {
@@ -747,7 +747,7 @@ object ConsumerGroupCommand extends Logging {
if (opts.options.has(opts.resetFromFileOpt))
Nil
else
- CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset
scopes should be defined: --all-topics, --topic.")
+ ToolsUtils.printUsageAndExit(opts.parser, "One of the reset scopes
should be defined: --all-topics, --topic.")
}
}
@@ -801,7 +801,7 @@ object ConsumerGroupCommand extends Logging {
partitionsToReset.map { topicPartition =>
logStartOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
- case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting starting offset of topic partition: $topicPartition")
+ case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error
getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
@@ -809,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
partitionsToReset.map { topicPartition =>
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
- case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
+ case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error
getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
@@ -830,7 +830,7 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
- case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
+ case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
@@ -844,7 +844,7 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition,
new OffsetAndMetadata(offset))
- case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
+ case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error
getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (resetPlanFromFile.isDefined) {
@@ -875,12 +875,12 @@ object ConsumerGroupCommand extends Logging {
val preparedOffsetsForPartitionsWithoutCommittedOffset =
getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) =>
(topicPartition, new OffsetAndMetadata(offset))
- case (topicPartition, _) =>
CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of
topic partition: $topicPartition")
+ case (topicPartition, _) =>
ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of
topic partition: $topicPartition")
}
preparedOffsetsForPartitionsWithCommittedOffset ++
preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
- CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires
one of the following scenarios: %s".format(opts.resetOffsetsOpt,
opts.allResetOffsetScenarioOpts) )
+ ToolsUtils.printUsageAndExit(opts.parser, "Option '%s' requires one of
the following scenarios: %s".format(opts.resetOffsetsOpt,
opts.allResetOffsetScenarioOpts))
}
}
@@ -1095,15 +1095,15 @@ object ConsumerGroupCommand extends Logging {
if (options.has(describeOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt takes one of these options:
${allGroupSelectionScopeOpts.mkString(", ")}")
val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt,
offsetsOpt, stateOpt)
if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else
0).sum > 1) {
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt takes at most one of these options:
${mutuallyExclusiveOpts.mkString(", ")}")
}
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt does not take a value for $stateOpt")
} else {
if (options.has(timeoutMsOpt))
@@ -1112,22 +1112,22 @@ object ConsumerGroupCommand extends Logging {
if (options.has(deleteOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $deleteOpt takes one of these options:
${allGroupSelectionScopeOpts.mkString(", ")}")
if (options.has(topicOpt))
- CommandLineUtils.printUsageAndDie(parser, s"The consumer does not
support topic-specific offset " +
+ CommandLineUtils.printUsageAndExit(parser, s"The consumer does not
support topic-specific offset " +
"deletion from a consumer group.")
}
if (options.has(deleteOffsetsOpt)) {
if (!options.has(groupOpt) || !options.has(topicOpt))
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $deleteOffsetsOpt takes the following options:
${allDeleteOffsetsOpts.mkString(", ")}")
}
if (options.has(resetOffsetsOpt)) {
if (options.has(dryRunOpt) && options.has(executeOpt))
- CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt
only accepts one of $executeOpt and $dryRunOpt")
+ CommandLineUtils.printUsageAndExit(parser, s"Option $resetOffsetsOpt
only accepts one of $executeOpt and $dryRunOpt")
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
Console.err.println("WARN: No action will be performed as the
--execute option is missing." +
@@ -1137,21 +1137,21 @@ object ConsumerGroupCommand extends Logging {
}
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
- CommandLineUtils.printUsageAndDie(parser,
+ CommandLineUtils.printUsageAndExit(parser,
s"Option $resetOffsetsOpt takes one of these options:
${allGroupSelectionScopeOpts.mkString(", ")}")
- CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,
allResetOffsetScenarioOpts - resetToOffsetOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt,
allResetOffsetScenarioOpts - resetToDatetimeOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt,
allResetOffsetScenarioOpts - resetByDurationOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt,
allResetOffsetScenarioOpts - resetToEarliestOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt,
allResetOffsetScenarioOpts - resetToLatestOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt,
allResetOffsetScenarioOpts - resetToCurrentOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt,
allResetOffsetScenarioOpts - resetShiftByOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt,
allResetOffsetScenarioOpts - resetFromFileOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,
(allResetOffsetScenarioOpts - resetToOffsetOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt,
(allResetOffsetScenarioOpts - resetToDatetimeOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt,
(allResetOffsetScenarioOpts - resetByDurationOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt,
(allResetOffsetScenarioOpts - resetToEarliestOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt,
(allResetOffsetScenarioOpts - resetToLatestOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt,
(allResetOffsetScenarioOpts - resetToCurrentOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt,
(allResetOffsetScenarioOpts - resetShiftByOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt,
(allResetOffsetScenarioOpts - resetFromFileOpt).asJava)
}
- CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
allGroupSelectionScopeOpts - groupOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, topicOpt,
allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
(allGroupSelectionScopeOpts - groupOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
(allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, topicOpt,
(allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt).asJava )
}
}
}
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 78984792ce2..7dce0d58515 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -20,17 +20,16 @@ package kafka.admin
import java.text.SimpleDateFormat
import java.util
import java.util.Base64
-
import joptsimple.ArgumentAcceptingOptionSpec
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions,
DescribeDelegationTokenOptions, ExpireDelegationTokenOptions,
RenewDelegationTokenOptions}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
-import scala.collection.Set
/**
* A command to manage delegation token.
@@ -40,12 +39,12 @@ object DelegationTokenCommand extends Logging {
def main(args: Array[String]): Unit = {
val opts = new DelegationTokenCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to
create, renew, expire, or describe delegation tokens.")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create,
renew, expire, or describe delegation tokens.")
// should have exactly one action
val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt,
opts.describeOpt).count(opts.options.has _)
if(actions != 1)
- CommandLineUtils.printUsageAndDie(opts.parser, "Command must include
exactly one action: --create, --renew, --expire or --describe")
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must include
exactly one action: --create, --renew, --expire or --describe")
opts.checkArgs()
@@ -207,17 +206,19 @@ object DelegationTokenCommand extends Logging {
if (options.has(createOpt))
CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
- if (options.has(renewOpt))
+ if (options.has(renewOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt,
renewTimePeriodOpt)
+ }
- if (options.has(expiryOpt))
+ if (options.has(expiryOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt,
expiryTimePeriodOpt)
+ }
// check invalid args
- CommandLineUtils.checkInvalidArgs(parser, options, createOpt,
Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, renewOpt,
Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt,
ownerPrincipalsOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt,
Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, describeOpt,
Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt,
expiryTimePeriodOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt,
renewTimePeriodOpt, expiryTimePeriodOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, renewOpt,
renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt,
maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)
+ CommandLineUtils.checkInvalidArgs(parser, options, describeOpt,
renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt,
expiryTimePeriodOpt)
}
}
}
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 71ef6fd6f19..b1747313708 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -19,14 +19,14 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
-
import kafka.common.AdminCommandFailedException
import kafka.utils.json.JsonValue
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
+import kafka.utils.{CoreUtils, Json}
import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -130,7 +130,7 @@ object DeleteRecordsCommand {
options = parser.parse(args : _*)
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to delete
records of the given partitions down to the specified offset.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete
records of the given partitions down to the specified offset.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt,
offsetJsonFileOpt)
}
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 92edcad003f..140f4b70177 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -20,8 +20,6 @@ import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.common.AdminCommandFailedException
-import kafka.utils.CommandDefaultOptions
-import kafka.utils.CommandLineUtils
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
@@ -33,6 +31,8 @@ import
org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
+
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.concurrent.duration._
@@ -44,7 +44,7 @@ object LeaderElectionCommand extends Logging {
def run(args: Array[String], timeout: Duration): Unit = {
val commandOptions = new LeaderElectionCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(
+ CommandLineUtils.maybePrintHelpOrVersion(
commandOptions,
"This tool attempts to elect a new leader for a set of topic partitions.
The type of elections supported are preferred replicas and unclean replicas."
)
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index d8c802e7d0e..870e6a17ba1 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -16,13 +16,12 @@
*/
package kafka.admin
-
import java.io.PrintStream
import java.util.Properties
-
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
+import kafka.utils.Json
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
LogDirDescription}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Map
@@ -126,7 +125,7 @@ object LogDirsCommand {
options = parser.parse(args : _*)
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to
query log directory usage on the specified brokers.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
query log directory usage on the specified brokers.")
CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt, describeOpt)
}
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2f3e3a69117..24c2fafb813 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -21,7 +21,7 @@ import java.util.Optional
import java.util.concurrent.ExecutionException
import kafka.common.AdminCommandFailedException
import kafka.server.DynamicConfig
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit,
Json, Logging}
+import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
import kafka.utils.json.JsonValue
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -31,6 +31,7 @@ import
org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopi
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition,
TopicPartitionReplica}
import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, mutable}
@@ -1330,20 +1331,20 @@ object ReassignPartitionsCommand extends Logging {
def validateAndParseArgs(args: Array[String]):
ReassignPartitionsCommandOptions = {
val opts = new ReassignPartitionsCommandOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, helpText)
+ CommandLineUtils.maybePrintHelpOrVersion(opts, helpText)
// Determine which action we should perform.
val validActions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt,
opts.cancelOpt, opts.listOpt)
val allActions = validActions.filter(opts.options.has _)
if (allActions.size != 1) {
- CommandLineUtils.printUsageAndDie(opts.parser, "Command must include
exactly one action: %s".format(
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must include
exactly one action: %s".format(
validActions.map("--" + _.options().get(0)).mkString(", ")))
}
val action = allActions.head
if (!opts.options.has(opts.bootstrapServerOpt))
- CommandLineUtils.printUsageAndDie(opts.parser, "Please specify
--bootstrap-server")
+ CommandLineUtils.printUsageAndExit(opts.parser, "Please specify
--bootstrap-server")
// Make sure that we have all the required arguments for our action.
val requiredArgs = Map(
@@ -1400,7 +1401,7 @@ object ReassignPartitionsCommand extends Logging {
if (!opt.equals(action) &&
!requiredArgs(action).contains(opt) &&
!permittedArgs(action).contains(opt)) {
- CommandLineUtils.printUsageAndDie(opts.parser,
+ CommandLineUtils.printUsageAndExit(opts.parser,
"""Option "%s" can't be used with action "%s"""".format(opt, action))
}
})
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 65148b203f8..3d91b6256b4 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -18,7 +18,7 @@
package kafka.admin
import java.util
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.utils._
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExist
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -605,53 +606,54 @@ object TopicCommand extends Logging {
def checkArgs(): Unit = {
if (args.isEmpty)
- CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe,
or change a topic.")
+ CommandLineUtils.printUsageAndExit(parser, "Create, delete, describe,
or change a topic.")
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to
create, delete, describe, or change a topic.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
create, delete, describe, or change a topic.")
// should have exactly one action
val actions = Seq(createOpt, listOpt, alterOpt, describeOpt,
deleteOpt).count(options.has)
if (actions != 1)
- CommandLineUtils.printUsageAndDie(parser, "Command must include
exactly one action: --list, --describe, --create, --alter or --delete")
+ CommandLineUtils.printUsageAndExit(parser, "Command must include
exactly one action: --list, --describe, --create, --alter or --delete")
// check required args
if (!has(bootstrapServerOpt))
throw new IllegalArgumentException("--bootstrap-server must be
specified")
if (has(describeOpt) && has(ifExistsOpt)) {
if (!has(topicOpt) && !has(topicIdOpt))
- CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is
required to describe a topic")
+ CommandLineUtils.printUsageAndExit(parser, "--topic or --topic-id is
required to describe a topic")
if (has(topicOpt) && has(topicIdOpt))
println("Only topic id will be used when both --topic and --topic-id
are specified and topicId is not Uuid.ZERO_UUID")
}
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(alterOpt)) {
- CommandLineUtils.checkInvalidArgsSet(parser, options,
Set(bootstrapServerOpt, configOpt), Set(alterOpt),
- Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
+ val usedOptions = immutable.Set[OptionSpec[_]](bootstrapServerOpt,
configOpt)
+ val invalidOptions = immutable.Set[OptionSpec[_]](alterOpt)
+ CommandLineUtils.checkInvalidArgsSet(parser, options,
usedOptions.asJava, invalidOptions.asJava,
Optional.of(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt)
}
// check invalid args
- CommandLineUtils.checkInvalidArgs(parser, options, configOpt,
allTopicLevelOpts -- Set(alterOpt, createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt,
allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt,
allTopicLevelOpts -- Set(alterOpt, createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt,
allTopicLevelOpts -- Set(createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
allTopicLevelOpts -- Set(createOpt,alterOpt))
- if(options.has(createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, configOpt,
(allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt,
(allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt,
(allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt,
(allTopicLevelOpts -- Set(createOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
(allTopicLevelOpts -- Set(createOpt,alterOpt)).asJava)
+ if (options.has(createOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options,
replicaAssignmentOpt, partitionsOpt, replicationFactorOpt)
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderReplicatedPartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+ (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnderMinIsrPartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt)
+ (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options,
reportAtMinIsrPartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt)
+ (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options,
reportUnavailablePartitionsOpt,
- allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+ (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts -
reportUnavailablePartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options,
topicsWithOverridesOpt,
- allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
- CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt,
allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt))
- CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt,
allTopicLevelOpts -- Set(createOpt))
- CommandLineUtils.checkInvalidArgs(parser, options,
excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
+ (allTopicLevelOpts -- Set(describeOpt) ++
allReplicationReportOpts).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt,
(allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt,
(allTopicLevelOpts -- Set(createOpt)).asJava)
+ CommandLineUtils.checkInvalidArgs(parser, options,
excludeInternalTopicOpt, (allTopicLevelOpts -- Set(listOpt,
describeOpt)).asJava)
}
}
}
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 126319503b5..7bc7339d548 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -19,11 +19,12 @@ package kafka.admin
import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
import kafka.server.KafkaConfig
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData,
ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@@ -70,7 +71,7 @@ object ZkSecurityMigrator extends Logging {
val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val opts = new ZkSecurityMigratorOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage)
+ CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage)
// Must have either SASL or TLS mutual authentication enabled to use this
tool.
// Instantiate the client config we will use so that we take into account
config provided via the CLI option
@@ -99,7 +100,7 @@ object ZkSecurityMigrator extends Logging {
info("zookeeper.acl option is unsecure")
false
case _ =>
- CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
+ ToolsUtils.printUsageAndExit(opts.parser, usageMessage)
}
val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
val zkSessionTimeout =
opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a70ce920e8e..2774f13397a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
Deserializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@@ -293,7 +294,7 @@ object ConsoleConsumer extends Logging {
options = tryParse(parser, args)
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read
data from Kafka topics and outputs it to standard output.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read
data from Kafka topics and outputs it to standard output.")
var groupIdPassed = true
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
@@ -302,7 +303,7 @@ object ConsoleConsumer extends Logging {
var topicArg: String = _
var includedTopicsArg: String = _
var filterSpec: TopicFilter = _
- val extraConsumerProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
+ val extraConsumerProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt))
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
@@ -315,7 +316,7 @@ object ConsoleConsumer extends Logging {
Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
else
new Properties()
- formatterArgs ++=
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+ formatterArgs ++=
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if (options.has(maxMessagesOpt))
options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt))
options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
@@ -341,19 +342,19 @@ object ConsoleConsumer extends Logging {
val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ ==
null)
// user need to specify value for either --topic or one of the include
filters options (--include or --whitelist)
if (topicOrFilterArgs.size != 1)
- CommandLineUtils.printUsageAndDie(parser, s"Exactly one of
--include/--topic is required. " +
+ CommandLineUtils.printUsageAndExit(parser, s"Exactly one of
--include/--topic is required. " +
s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use
--include instead; ignored if --include specified."}")
if (partitionArg.isDefined) {
if (!options.has(topicOpt))
- CommandLineUtils.printUsageAndDie(parser, "The topic is required when
partition is specified.")
+ CommandLineUtils.printUsageAndExit(parser, "The topic is required when
partition is specified.")
if (fromBeginning && options.has(offsetOpt))
- CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and
offset cannot be specified together.")
+ CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and
offset cannot be specified together.")
} else if (options.has(offsetOpt))
- CommandLineUtils.printUsageAndDie(parser, "The partition is required
when offset is specified.")
+ CommandLineUtils.printUsageAndExit(parser, "The partition is required
when offset is specified.")
def invalidOffset(offset: String): Nothing =
- CommandLineUtils.printUsageAndDie(parser, s"The provided offset value
'$offset' is incorrect. Valid values are " +
+ ToolsUtils.printUsageAndExit(parser, s"The provided offset value
'$offset' is incorrect. Valid values are " +
"'earliest', 'latest', or a non-negative long.")
val offsetArg =
@@ -385,7 +386,7 @@ object ConsoleConsumer extends Logging {
).flatten
if (groupIdsProvided.size > 1) {
- CommandLineUtils.printUsageAndDie(parser, "The group ids provided in
different places (directly using '--group', "
+ CommandLineUtils.printUsageAndExit(parser, "The group ids provided in
different places (directly using '--group', "
+ "via '--consumer-property', or via '--consumer.config') do not
match. "
+ s"Detected group ids: ${groupIdsProvided.mkString("'", "', '",
"'")}")
}
@@ -403,14 +404,14 @@ object ConsoleConsumer extends Logging {
}
if (groupIdPassed && partitionArg.isDefined)
- CommandLineUtils.printUsageAndDie(parser, "Options group and partition
cannot be specified together.")
+ CommandLineUtils.printUsageAndExit(parser, "Options group and partition
cannot be specified together.")
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
- CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+ ToolsUtils.printUsageAndExit(parser, e.getMessage)
}
}
}
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 668a709d60e..245212471d0 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -24,14 +24,13 @@ import java.util.regex.Pattern
import joptsimple.{OptionException, OptionParser, OptionSet}
import kafka.common.MessageReader
import kafka.utils.Implicits._
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{Exit, ToolsUtils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
-
-import scala.jdk.CollectionConverters._
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
object ConsoleProducer {
@@ -260,7 +259,7 @@ object ConsoleProducer {
options = tryParse(parser, args)
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read
data from standard input and publish it to Kafka.")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read
data from standard input and publish it to Kafka.")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
@@ -280,15 +279,15 @@ object ConsoleProducer {
else compressionCodecOptionValue
else CompressionType.NONE.name
val readerClass = options.valueOf(messageReaderOpt)
- val cmdLineProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
- val extraProducerProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
+ val cmdLineProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+ val extraProducerProps =
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
- CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+ ToolsUtils.printUsageAndExit(parser, e.getMessage)
}
}
}
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 89428e5663b..56f49456705 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -22,14 +22,14 @@ import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}
-
import com.typesafe.scalalogging.LazyLogging
import joptsimple.OptionException
-import kafka.utils.{CommandLineUtils, ToolsUtils}
+import kafka.utils.ToolsUtils
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener,
KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -260,13 +260,13 @@ object ConsumerPerformance extends LazyLogging {
options = parser.parse(args: _*)
catch {
case e: OptionException =>
- CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage)
}
if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
println("WARNING: option [threads] and [num-fetch-threads] have been
deprecated and will be ignored by the test")
- CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in
performance test for the full zookeeper consumer")
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in
performance test for the full zookeeper consumer")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index ec0fb7d2e73..09d66cb2197 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.server.log.internals.{CorruptSnapshotException,
OffsetIndex, TimeIndex, TransactionIndex}
import org.apache.kafka.snapshot.Snapshots
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@@ -46,7 +47,7 @@ object DumpLogSegments {
def main(args: Array[String]): Unit = {
val opts = new DumpLogSegmentsOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse
a log file and dump its contents to the console, useful for debugging a
seemingly corrupt log segment.")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to parse a
log file and dump its contents to the console, useful for debugging a seemingly
corrupt log segment.")
opts.checkArgs()
val misMatchesForIndexFilesMap = mutable.Map[String, List[(Long, Long)]]()
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index a8fd87cbe87..379b92218c8 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -19,11 +19,12 @@
package kafka.tools
import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
+import kafka.utils.{Exit, IncludeList, ToolsUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig,
ListTopicsOptions, OffsetSpec}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.requests.{ListOffsetsRequest,
ListOffsetsResponse}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.CommandLineUtils
import java.util.Properties
import java.util.concurrent.ExecutionException
@@ -82,7 +83,7 @@ object GetOffsetShell {
val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics",
s"By default, internal topics are included. If specified, internal topics are
excluded.")
if (args.isEmpty)
- CommandLineUtils.printUsageAndDie(parser, "An interactive shell for
getting topic-partition offsets.")
+ CommandLineUtils.printUsageAndExit(parser, "An interactive shell for
getting topic-partition offsets.")
val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala
b/core/src/main/scala/kafka/tools/JmxTool.scala
index c0f6d4a5ead..223c459bcc2 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -23,13 +23,13 @@ import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import javax.rmi.ssl.SslRMIClientSocketFactory
-
import joptsimple.OptionParser
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.math._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
+import org.apache.kafka.server.util.CommandLineUtils
/**
@@ -99,7 +99,7 @@ object JmxTool extends Logging {
if(args.isEmpty)
- CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard
output.")
+ CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard
output.")
val options = parser.parse(args : _*)
@@ -207,7 +207,7 @@ object JmxTool extends Logging {
}
if(numExpectedAttributes.isEmpty) {
- CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for
the queried objects $queries.")
+ CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for
the queried objects $queries.")
}
// print csv header
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 75c2d144b02..067e8aee84c 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicInteger}
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
import java.util.{Collections, Properties}
-
import kafka.consumer.BaseConsumerRecord
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
@@ -35,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
@@ -86,7 +86,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Starting mirror maker")
try {
val opts = new MirrorMakerOptions(args)
- CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to
continuously copy data between two Kafka clusters.")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
continuously copy data between two Kafka clusters.")
opts.checkArgs()
} catch {
case ct: ControlThrowable => throw ct
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala
b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 836163c85fa..6857e401e80 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -17,8 +17,7 @@
package kafka.tools
-import kafka.utils.CommandDefaultOptions
-
+import org.apache.kafka.server.util.CommandDefaultOptions
class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of
messages to send or consume")
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 599384530b7..746cd1410fe 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest,
FetchResponse, ListOffsetsRequest}
import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
@@ -114,11 +115,11 @@ object ReplicaVerificationTool extends Logging {
val options = parser.parse(args: _*)
if (args.isEmpty || options.has(helpOpt)) {
- CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas
for a set of topics have the same data.")
+ CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas
for a set of topics have the same data.")
}
if (options.has(versionOpt)) {
- CommandLineUtils.printVersionAndDie()
+ CommandLineUtils.printVersionAndExit()
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 32e43a6606d..8986d8c8ff0 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -23,12 +23,12 @@ import scala.util.matching.Regex
import collection.mutable
import java.util.Date
import java.text.SimpleDateFormat
+import kafka.utils.{CoreUtils, Exit, Logging}
-import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
import java.io.{BufferedOutputStream, OutputStream}
import java.nio.charset.StandardCharsets
-
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.util.CommandLineUtils
/**
* A utility that merges the state change logs (possibly obtained from
different brokers and over multiple days).
@@ -89,7 +89,7 @@ object StateChangeLogMerger extends Logging {
.defaultsTo("9999-12-31 23:59:59,999")
if(args.isEmpty)
- CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log
files from several brokers to reconnstruct a unified history of what happened.")
+ CommandLineUtils.printUsageAndExit(parser, "A tool for merging the log
files from several brokers to reconnstruct a unified history of what happened.")
val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 99c0f789cd9..1b52b2d2ec0 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -21,7 +21,6 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
-import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
@@ -39,7 +38,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
-import scala.collection.JavaConverters;
+import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.text.ParseException;
@@ -282,13 +281,13 @@ public class StreamsResetter {
try {
options = optionParser.parse(args);
if (args.length == 0 || options.has(helpOption)) {
- CommandLineUtils.printUsageAndDie(optionParser, USAGE);
+ CommandLineUtils.printUsageAndExit(optionParser, USAGE);
}
if (options.has(versionOption)) {
- CommandLineUtils.printVersionAndDie();
+ CommandLineUtils.printVersionAndExit();
}
} catch (final OptionException e) {
- CommandLineUtils.printUsageAndDie(optionParser, e.getMessage());
+ CommandLineUtils.printUsageAndExit(optionParser, e.getMessage());
}
final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
@@ -319,7 +318,7 @@ public class StreamsResetter {
optionParser,
options,
option,
- JavaConverters.asScalaSetConverter(invalidOptions).asScala());
+ invalidOptions);
}
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final
Map<Object, Object> consumerConfig,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 47ba5a747e1..0c31b4187a0 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -24,7 +24,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties,
SimpleApiVersionManager}
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit,
Logging, ShutdownableThread}
+import kafka.utils.{CoreUtils, Exit, Logging, ShutdownableThread}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
@@ -39,6 +39,7 @@ import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient,
RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessExitingFaultHandler
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.snapshot.SnapshotReader
import scala.jdk.CollectionConverters._
@@ -435,7 +436,7 @@ object TestRaftServer extends Logging {
def main(args: Array[String]): Unit = {
val opts = new TestRaftServerOptions(args)
try {
- CommandLineUtils.printHelpAndExitIfNeeded(opts,
+ CommandLineUtils.maybePrintHelpOrVersion(opts,
"Standalone raft server for performance testing")
val configFile = opts.options.valueOf(opts.configOpt)
@@ -460,7 +461,7 @@ object TestRaftServer extends Logging {
Exit.exit(0)
} catch {
case e: OptionException =>
- CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+ CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
case e: Throwable =>
fatal("Exiting raft server due to fatal exception", e)
Exit.exit(1)
diff --git a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
b/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
deleted file mode 100644
index 2cdb408b4bb..00000000000
--- a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import joptsimple.{OptionParser, OptionSet}
-
-abstract class CommandDefaultOptions(val args: Array[String],
allowCommandOptionAbbreviation: Boolean = false) {
- val parser = new OptionParser(allowCommandOptionAbbreviation)
- val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
- val versionOpt = parser.accepts("version", "Display Kafka
version.").forHelp()
- var options: OptionSet = _
-}
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
deleted file mode 100644
index e9dcee0eb18..00000000000
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils
-
-import java.util.Properties
-
-import joptsimple.{OptionParser, OptionSet, OptionSpec}
-
-import scala.collection.Set
-
-/**
- * Helper functions for dealing with command line utilities
- */
-object CommandLineUtils extends Logging {
- /**
- * Check if there are no options or `--help` option from command line
- *
- * @param commandOpts Acceptable options for a command
- * @return true on matching the help check condition
- */
- def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
- commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt)
- }
-
- def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = {
- commandOpts.options.has(commandOpts.versionOpt)
- }
-
- /**
- * Check and print help message if there is no options or `--help` option
- * from command line, if `--version` is specified on the command line
- * print version information and exit.
- * NOTE: The function name is not strictly speaking correct anymore
- * as it also checks whether the version needs to be printed, but
- * refactoring this would have meant changing all command line tools
- * and unnecessarily increased the blast radius of this change.
- *
- * @param commandOpts Acceptable options for a command
- * @param message Message to display on successful check
- */
- def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message:
String): Unit = {
- if (isPrintHelpNeeded(commandOpts))
- printUsageAndDie(commandOpts.parser, message)
- if (isPrintVersionNeeded(commandOpts))
- printVersionAndDie()
- }
-
- /**
- * Check that all the listed options are present
- */
- def checkRequiredArgs(parser: OptionParser, options: OptionSet, required:
OptionSpec[_]*): Unit = {
- for (arg <- required) {
- if (!options.has(arg))
- printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
- }
- }
-
- /**
- * Check that none of the listed options are present
- */
- def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption:
OptionSpec[_], invalidOptions: Set[OptionSpec[_]]): Unit = {
- if (options.has(usedOption)) {
- for (arg <- invalidOptions) {
- if (options.has(arg))
- printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be
used with option \"" + arg + "\"")
- }
- }
- }
-
- /**
- * Check that none of the listed options are present with the combination
of used options
- */
- def checkInvalidArgsSet(parser: OptionParser, options: OptionSet,
usedOptions: Set[OptionSpec[_]], invalidOptions: Set[OptionSpec[_]],
- trailingAdditionalMessage: Option[String] = None):
Unit = {
- if (usedOptions.count(options.has) == usedOptions.size) {
- for (arg <- invalidOptions) {
- if (options.has(arg))
- printUsageAndDie(parser, "Option combination \"" +
usedOptions.mkString(",") + "\" can't be used with option \"" + arg + "\"" +
trailingAdditionalMessage.getOrElse(""))
- }
- }
- }
-
- /**
- * Print usage and exit
- */
- def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
- System.err.println(message)
- parser.printHelpOn(System.err)
- Exit.exit(1, Some(message))
- }
-
- def printVersionAndDie(): Nothing = {
- System.out.println(VersionInfo.getVersionString)
- Exit.exit(0)
- }
-
- /**
- * Parse key-value pairs in the form key=value
- * value may contain equals sign
- */
- def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean =
true): Properties = {
- val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty)
-
- val props = new Properties
- for (a <- splits) {
- if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
- if (acceptMissingValue) props.put(a(0), "")
- else throw new IllegalArgumentException(s"Missing value for key
${a(0)}")
- }
- else props.put(a(0), a(1))
- }
- props
- }
-
- /**
- * Merge the options into {@code props} for key {@code key}, with the
following precedence, from high to low:
- * 1) if {@code spec} is specified on {@code options} explicitly, use the
value;
- * 2) if {@code props} already has {@code key} set, keep it;
- * 3) otherwise, use the default value of {@code spec}.
- * A {@code null} value means to remove {@code key} from the {@code props}.
- */
- def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet,
spec: OptionSpec[V]): Unit = {
- if (options.has(spec) || !props.containsKey(key)) {
- val value = options.valueOf(spec)
- if (value == null)
- props.remove(key)
- else
- props.put(key, value.toString)
- }
- }
-}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 056545cb031..10586317f65 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -18,6 +18,7 @@ package kafka.utils
import joptsimple.OptionParser
import org.apache.kafka.common.{Metric, MetricName}
+import org.apache.kafka.server.util.CommandLineUtils
import scala.collection.mutable
@@ -32,8 +33,8 @@ object ToolsUtils {
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
val isValid = !validHostPort.isEmpty && validHostPort.size ==
hostPorts.length
- if(!isValid)
- CommandLineUtils.printUsageAndDie(parser, "Please provide valid
host:port like host1:9091,host2:9092\n ")
+ if (!isValid)
+ CommandLineUtils.printUsageAndExit(parser, "Please provide valid
host:port like host1:9091,host2:9092\n ")
}
/**
@@ -64,4 +65,18 @@ object ToolsUtils {
println(s"%-${maxLengthOfDisplayName}s :
$specifier".format(metricName, value))
}
}
+
+ /**
+ * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
+ * It is needed for tools migration (KAFKA-14525), as there is no Java
equivalent for return type `Nothing`.
+ * Can be removed once [[kafka.admin.ConsumerGroupCommand]],
[[kafka.tools.ConsoleConsumer]]
+ * and [[kafka.tools.ConsoleProducer]] are migrated.
+ *
+ * @param parser Command line options parser.
+ * @param message Error message.
+ */
+ def printUsageAndExit(parser: OptionParser, message: String): Nothing = {
+ CommandLineUtils.printUsageAndExit(parser, message)
+ throw new AssertionError("printUsageAndExit should not return, but it
did.")
+ }
}
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
index 1b3b9f5b18f..141a68396dd 100755
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer,
ProducerConfig, Produce
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer,
StringDeserializer}
import org.apache.kafka.common.utils.{AbstractIterator, Utils}
+import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._
@@ -98,7 +99,7 @@ object LogCompactionTester {
val options = parser.parse(args: _*)
if (args.isEmpty)
- CommandLineUtils.printUsageAndDie(parser, "A tool to test log
compaction. Valid options are: ")
+ CommandLineUtils.printUsageAndExit(parser, "A tool to test log
compaction. Valid options are: ")
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt,
numMessagesOpt)
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 4b930374e38..7cc7d5254e4 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
+import org.apache.kafka.server.util.CommandLineUtils
import scala.math._
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 507c3ff0973..edb75620eeb 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -26,6 +26,7 @@ import joptsimple._
import kafka.server.{DelayedOperation, DelayedOperationPurgatory}
import kafka.utils._
import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.util.CommandLineUtils
import scala.math._
import scala.jdk.CollectionConverters._
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index 13fc262ec4e..98d54629f20 100644
---
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -216,7 +216,7 @@ class ReassignPartitionsCommandArgsTest {
@Test
def shouldPrintHelpTextIfHelpArg(): Unit = {
val args: Array[String]= Array("--help")
- // note, this is not actually a failed case, it's just we share the same
`printUsageAndDie` method when wrong arg received
+ // note, this is not actually a failed case, it's just we share the same
`printUsageAndExit` method when wrong arg received
shouldFailWith(ReassignPartitionsCommand.helpText, args)
}
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
deleted file mode 100644
index 8b528ed179b..00000000000
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.Properties
-
-import joptsimple.{OptionParser, OptionSpec}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class CommandLineUtilsTest {
-
-
- @Test
- def testParseEmptyArg(): Unit = {
- val argArray = Array("my.empty.property=")
-
- assertThrows(classOf[java.lang.IllegalArgumentException], () =>
CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
- }
-
- @Test
- def testParseEmptyArgWithNoDelimiter(): Unit = {
- val argArray = Array("my.empty.property")
-
- assertThrows(classOf[java.lang.IllegalArgumentException], () =>
CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
- }
-
- @Test
- def testParseEmptyArgAsValid(): Unit = {
- val argArray = Array("my.empty.property=", "my.empty.property1")
- val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
- assertEquals(props.getProperty("my.empty.property"), "", "Value of a key
with missing value should be an empty string")
- assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key
with missing value with no delimiter should be an empty string")
- }
-
- @Test
- def testParseSingleArg(): Unit = {
- val argArray = Array("my.property=value")
- val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
- assertEquals(props.getProperty("my.property"), "value", "Value of a single
property should be 'value' ")
- }
-
- @Test
- def testParseArgs(): Unit = {
- val argArray = Array("first.property=first","second.property=second")
- val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
- assertEquals(props.getProperty("first.property"), "first", "Value of first
property should be 'first'")
- assertEquals(props.getProperty("second.property"), "second", "Value of
second property should be 'second'")
- }
-
- @Test
- def testParseArgsWithMultipleDelimiters(): Unit = {
- val argArray = Array("first.property==first", "second.property=second=",
"third.property=thi=rd")
- val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
- assertEquals(props.getProperty("first.property"), "=first", "Value of
first property should be '=first'")
- assertEquals(props.getProperty("second.property"), "second=", "Value of
second property should be 'second='")
- assertEquals(props.getProperty("third.property"), "thi=rd", "Value of
second property should be 'thi=rd'")
- }
-
- val props = new Properties()
- val parser = new OptionParser(false)
- var stringOpt : OptionSpec[String] = _
- var intOpt : OptionSpec[java.lang.Integer] = _
- var stringOptOptionalArg : OptionSpec[String] = _
- var intOptOptionalArg : OptionSpec[java.lang.Integer] = _
- var stringOptOptionalArgNoDefault : OptionSpec[String] = _
- var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _
-
- def setUpOptions(): Unit = {
- stringOpt = parser.accepts("str")
- .withRequiredArg
- .ofType(classOf[String])
- .defaultsTo("default-string")
- intOpt = parser.accepts("int")
- .withRequiredArg()
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(100)
- stringOptOptionalArg = parser.accepts("str-opt")
- .withOptionalArg
- .ofType(classOf[String])
- .defaultsTo("default-string-2")
- intOptOptionalArg = parser.accepts("int-opt")
- .withOptionalArg
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(200)
- stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
- .withOptionalArg
- .ofType(classOf[String])
- intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
- .withOptionalArg
- .ofType(classOf[java.lang.Integer])
- }
-
- @Test
- def testMaybeMergeOptionsOverwriteExisting(): Unit = {
- setUpOptions()
-
- props.put("skey", "existing-string")
- props.put("ikey", "300")
- props.put("sokey", "existing-string-2")
- props.put("iokey", "400")
- props.put("sondkey", "existing-string-3")
- props.put("iondkey", "500")
-
- val options = parser.parse(
- "--str", "some-string",
- "--int", "600",
- "--str-opt", "some-string-2",
- "--int-opt", "700",
- "--str-opt-nodef", "some-string-3",
- "--int-opt-nodef", "800"
- )
-
- CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
- CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
- CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault)
- CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault)
-
- assertEquals("some-string", props.get("skey"))
- assertEquals("600", props.get("ikey"))
- assertEquals("some-string-2", props.get("sokey"))
- assertEquals("700", props.get("iokey"))
- assertEquals("some-string-3", props.get("sondkey"))
- assertEquals("800", props.get("iondkey"))
- }
-
- @Test
- def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = {
- setUpOptions()
-
- props.put("sokey", "existing-string")
- props.put("iokey", "300")
- props.put("sondkey", "existing-string-2")
- props.put("iondkey", "400")
-
- val options = parser.parse(
- "--str-opt",
- "--int-opt",
- "--str-opt-nodef",
- "--int-opt-nodef"
- )
-
- CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault)
- CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault)
-
- assertEquals("default-string-2", props.get("sokey"))
- assertEquals("200", props.get("iokey"))
- assertNull(props.get("sondkey"))
- assertNull(props.get("iondkey"))
- }
-
- @Test
- def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = {
- setUpOptions()
-
- val options = parser.parse()
-
- CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
- CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
- CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault)
- CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault)
-
- assertEquals("default-string", props.get("skey"))
- assertEquals("100", props.get("ikey"))
- assertEquals("default-string-2", props.get("sokey"))
- assertEquals("200", props.get("iokey"))
- assertNull(props.get("sondkey"))
- assertNull(props.get("iondkey"))
- }
-
- @Test
- def testMaybeMergeOptionsNotOverwriteExisting(): Unit = {
- setUpOptions()
-
- props.put("skey", "existing-string")
- props.put("ikey", "300")
- props.put("sokey", "existing-string-2")
- props.put("iokey", "400")
- props.put("sondkey", "existing-string-3")
- props.put("iondkey", "500")
-
- val options = parser.parse()
-
- CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
- CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
- CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg)
- CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault)
- CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault)
-
- assertEquals("existing-string", props.get("skey"))
- assertEquals("300", props.get("ikey"))
- assertEquals("existing-string-2", props.get("sokey"))
- assertEquals("400", props.get("iokey"))
- assertEquals("existing-string-3", props.get("sondkey"))
- assertEquals("500", props.get("iondkey"))
- }
-}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
new file mode 100644
index 00000000000..10bbd0becdd
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+
+public abstract class CommandDefaultOptions {
+ public final String[] args;
+ public final OptionParser parser;
+ public final AbstractOptionSpec<Void> helpOpt;
+ public final AbstractOptionSpec<Void> versionOpt;
+ public OptionSet options;
+
+ public CommandDefaultOptions(String[] args) {
+ this(args, false);
+ }
+
+ public CommandDefaultOptions(String[] args, boolean
allowCommandOptionAbbreviation) {
+ this.args = args;
+ this.parser = new OptionParser(allowCommandOptionAbbreviation);
+ this.helpOpt = parser.accepts("help", "Print usage
information.").forHelp();
+ this.versionOpt = parser.accepts("version", "Display Kafka
version.").forHelp();
+ this.options = null;
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
new file mode 100644
index 00000000000..0f904127dcd
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+ /**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+ public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts)
{
+ return commandOpts.args.length == 0 ||
commandOpts.options.has(commandOpts.helpOpt);
+ }
+
+ /**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+ public static boolean isPrintVersionNeeded(CommandDefaultOptions
commandOpts) {
+ return commandOpts.options.has(commandOpts.versionOpt);
+ }
+
+ /**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+ public static void maybePrintHelpOrVersion(CommandDefaultOptions
commandOpts, String message) {
+ if (isPrintHelpNeeded(commandOpts)) {
+ printUsageAndExit(commandOpts.parser, message);
+ }
+ if (isPrintVersionNeeded(commandOpts)) {
+ printVersionAndExit();
+ }
+ }
+
+ /**
+ * Check that all the listed options are present.
+ */
+ public static void checkRequiredArgs(OptionParser parser, OptionSet
options, OptionSpec<?>... requiredList) {
+ for (OptionSpec<?> arg : requiredList) {
+ if (!options.has(arg)) {
+ printUsageAndExit(parser, String.format("Missing required
argument \"%s\"", arg));
+ }
+ }
+ }
+
+ /**
+ * Check that none of the listed options are present.
+ */
+ public static void checkInvalidArgs(OptionParser parser,
+ OptionSet options,
+ OptionSpec<?> usedOption,
+ OptionSpec<?>... invalidOptions) {
+ if (options.has(usedOption)) {
+ for (OptionSpec<?> arg : invalidOptions) {
+ if (options.has(arg)) {
+ printUsageAndExit(parser, String.format("Option \"%s\"
can't be used with option \"%s\"", usedOption, arg));
+ }
+ }
+ }
+ }
+
+ /**
+ * Check that none of the listed options are present.
+ */
+ public static void checkInvalidArgs(OptionParser parser,
+ OptionSet options,
+ OptionSpec<?> usedOption,
+ Set<OptionSpec<?>> invalidOptions) {
+ OptionSpec<?>[] array = new OptionSpec<?>[invalidOptions.size()];
+ invalidOptions.toArray(array);
+ checkInvalidArgs(parser, options, usedOption, array);
+ }
+
+ /**
+ * Check that none of the listed options are present with the combination
of used options.
+ */
+ public static void checkInvalidArgsSet(OptionParser parser,
+ OptionSet options,
+ Set<OptionSpec<?>> usedOptions,
+ Set<OptionSpec<?>> invalidOptions,
+ Optional<String>
trailingAdditionalMessage) {
+ if (usedOptions.stream().filter(options::has).count() ==
usedOptions.size()) {
+ for (OptionSpec<?> arg : invalidOptions) {
+ if (options.has(arg)) {
+ printUsageAndExit(parser, String.format("Option
combination \"%s\" can't be used with option \"%s\"%s",
+ usedOptions, arg,
trailingAdditionalMessage.orElse("")));
+ }
+ }
+ }
+ }
+
+ public static void printUsageAndExit(OptionParser parser, String message) {
+ System.err.println(message);
+ try {
+ parser.printHelpOn(System.err);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Exit.exit(1, message);
+ }
+
+ public static void printVersionAndExit() {
+ System.out.println(AppInfoParser.getVersion());
+ Exit.exit(0);
+ }
+
+ /**
+ * Parse key-value pairs in the form key=value.
+ * Value may contain equals sign.
+ */
+ public static Properties parseKeyValueArgs(List<String> args) {
+ return parseKeyValueArgs(args, true);
+ }
+
+ /**
+ * Parse key-value pairs in the form key=value.
+ * Value may contain equals sign.
+ */
+ public static Properties parseKeyValueArgs(List<String> args, boolean
acceptMissingValue) {
+ Properties props = new Properties();
+ List<String[]> splits = new ArrayList<>();
+ args.forEach(arg -> {
+ String[] split = arg.split("=", 2);
+ if (split.length > 0) {
+ splits.add(split);
+ }
+ });
+ splits.forEach(split -> {
+ if (split.length == 1 || (split.length == 2 && (split[1] == null
|| split[1].isEmpty()))) {
+ if (acceptMissingValue) {
+ props.put(split[0], "");
+ } else {
+ throw new IllegalArgumentException(String.format("Missing
value for key %s}", split[0]));
+ }
+ } else {
+ props.put(split[0], split[1]);
+ }
+ });
+ return props;
+ }
+
+ /**
+ * Merge the options into {@code props} for key {@code key}, with the
following precedence, from high to low:
+ * 1) if {@code spec} is specified on {@code options} explicitly, use the
value;
+ * 2) if {@code props} already has {@code key} set, keep it;
+ * 3) otherwise, use the default value of {@code spec}.
+ * A {@code null} value means to remove {@code key} from the {@code props}.
+ */
+ public static <T> void maybeMergeOptions(Properties props, String key,
OptionSet options, OptionSpec<T> spec) {
+ if (options.has(spec) || !props.containsKey(key)) {
+ T value = options.valueOf(spec);
+ if (value == null) {
+ props.remove(key);
+ } else {
+ props.put(key, value.toString());
+ }
+ }
+ }
+}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
new file mode 100644
index 00000000000..4d12122e3ae
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CommandLineUtilsTest {
+ @Test
+ public void testParseEmptyArg() {
+ List<String> argArray = Arrays.asList("my.empty.property=");
+
+ assertThrows(IllegalArgumentException.class, () ->
CommandLineUtils.parseKeyValueArgs(argArray, false));
+ }
+
+ @Test
+ public void testParseEmptyArgWithNoDelimiter() {
+ List<String> argArray = Arrays.asList("my.empty.property");
+
+ assertThrows(IllegalArgumentException.class, () ->
CommandLineUtils.parseKeyValueArgs(argArray, false));
+ }
+
+ @Test
+ public void testParseEmptyArgAsValid() {
+ List<String> argArray = Arrays.asList("my.empty.property=",
"my.empty.property1");
+ Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+ assertEquals(props.getProperty("my.empty.property"), "", "Value of a
key with missing value should be an empty string");
+ assertEquals(props.getProperty("my.empty.property1"), "", "Value of a
key with missing value with no delimiter should be an empty string");
+ }
+
+ @Test
+ public void testParseSingleArg() {
+ List<String> argArray = Arrays.asList("my.property=value");
+ Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+ assertEquals(props.getProperty("my.property"), "value", "Value of a
single property should be 'value'");
+ }
+
+ @Test
+ public void testParseArgs() {
+ List<String> argArray = Arrays.asList("first.property=first",
"second.property=second");
+ Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+ assertEquals(props.getProperty("first.property"), "first", "Value of
first property should be 'first'");
+ assertEquals(props.getProperty("second.property"), "second", "Value of
second property should be 'second'");
+ }
+
+ @Test
+ public void testParseArgsWithMultipleDelimiters() {
+ List<String> argArray = Arrays.asList("first.property==first",
"second.property=second=", "third.property=thi=rd");
+ Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+ assertEquals(props.getProperty("first.property"), "=first", "Value of
first property should be '=first'");
+ assertEquals(props.getProperty("second.property"), "second=", "Value
of second property should be 'second='");
+ assertEquals(props.getProperty("third.property"), "thi=rd", "Value of
second property should be 'thi=rd'");
+ }
+
+ Properties props = new Properties();
+ OptionParser parser = new OptionParser(false);
+ OptionSpec<String> stringOpt;
+ OptionSpec<Integer> intOpt;
+ OptionSpec<String> stringOptOptionalArg;
+ OptionSpec<Integer> intOptOptionalArg;
+ OptionSpec<String> stringOptOptionalArgNoDefault;
+ OptionSpec<Integer> intOptOptionalArgNoDefault;
+
+ private void setUpOptions() {
+ stringOpt = parser.accepts("str")
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo("default-string");
+ intOpt = parser.accepts("int")
+ .withRequiredArg()
+ .ofType(Integer.class)
+ .defaultsTo(100);
+ stringOptOptionalArg = parser.accepts("str-opt")
+ .withOptionalArg()
+ .ofType(String.class)
+ .defaultsTo("default-string-2");
+ intOptOptionalArg = parser.accepts("int-opt")
+ .withOptionalArg()
+ .ofType(Integer.class)
+ .defaultsTo(200);
+ stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
+ .withOptionalArg()
+ .ofType(String.class);
+ intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
+ .withOptionalArg()
+ .ofType(Integer.class);
+ }
+
+ @Test
+ public void testMaybeMergeOptionsOverwriteExisting() {
+ setUpOptions();
+
+ props.put("skey", "existing-string");
+ props.put("ikey", "300");
+ props.put("sokey", "existing-string-2");
+ props.put("iokey", "400");
+ props.put("sondkey", "existing-string-3");
+ props.put("iondkey", "500");
+
+ OptionSet options = parser.parse(
+ "--str", "some-string",
+ "--int", "600",
+ "--str-opt", "some-string-2",
+ "--int-opt", "700",
+ "--str-opt-nodef", "some-string-3",
+ "--int-opt-nodef", "800"
+ );
+
+ CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+ CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+ CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault);
+ CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault);
+
+ assertEquals("some-string", props.get("skey"));
+ assertEquals("600", props.get("ikey"));
+ assertEquals("some-string-2", props.get("sokey"));
+ assertEquals("700", props.get("iokey"));
+ assertEquals("some-string-3", props.get("sondkey"));
+ assertEquals("800", props.get("iondkey"));
+ }
+
+ @Test
+ public void testMaybeMergeOptionsDefaultOverwriteExisting() {
+ setUpOptions();
+
+ props.put("sokey", "existing-string");
+ props.put("iokey", "300");
+ props.put("sondkey", "existing-string-2");
+ props.put("iondkey", "400");
+
+ OptionSet options = parser.parse(
+ "--str-opt",
+ "--int-opt",
+ "--str-opt-nodef",
+ "--int-opt-nodef"
+ );
+
+ CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault);
+ CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault);
+
+ assertEquals("default-string-2", props.get("sokey"));
+ assertEquals("200", props.get("iokey"));
+ assertNull(props.get("sondkey"));
+ assertNull(props.get("iondkey"));
+ }
+
+ @Test
+ public void testMaybeMergeOptionsDefaultValueIfNotExist() {
+ setUpOptions();
+
+ OptionSet options = parser.parse();
+
+ CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+ CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+ CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault);
+ CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault);
+
+ assertEquals("default-string", props.get("skey"));
+ assertEquals("100", props.get("ikey"));
+ assertEquals("default-string-2", props.get("sokey"));
+ assertEquals("200", props.get("iokey"));
+ assertNull(props.get("sondkey"));
+ assertNull(props.get("iondkey"));
+ }
+
+ @Test
+ public void testMaybeMergeOptionsNotOverwriteExisting() {
+ setUpOptions();
+
+ props.put("skey", "existing-string");
+ props.put("ikey", "300");
+ props.put("sokey", "existing-string-2");
+ props.put("iokey", "400");
+ props.put("sondkey", "existing-string-3");
+ props.put("iondkey", "500");
+
+ OptionSet options = parser.parse();
+
+ CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+ CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+ CommandLineUtils.maybeMergeOptions(props, "sokey", options,
stringOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "iokey", options,
intOptOptionalArg);
+ CommandLineUtils.maybeMergeOptions(props, "sondkey", options,
stringOptOptionalArgNoDefault);
+ CommandLineUtils.maybeMergeOptions(props, "iondkey", options,
intOptOptionalArgNoDefault);
+
+ assertEquals("existing-string", props.get("skey"));
+ assertEquals("300", props.get("ikey"));
+ assertEquals("existing-string-2", props.get("sokey"));
+ assertEquals("400", props.get("iokey"));
+ assertEquals("existing-string-3", props.get("sondkey"));
+ assertEquals("500", props.get("iondkey"));
+ }
+}