This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 72cfc994f56 KAFKA-14628: Move CommandLineUtils and 
CommandDefaultOptions to tools (#13131)
72cfc994f56 is described below

commit 72cfc994f5675be349d4494ece3528efed290651
Author: Federico Valeri <fedeval...@gmail.com>
AuthorDate: Thu Jan 26 20:06:09 2023 +0100

    KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions to tools 
(#13131)
    
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Christo Lolov 
<christolo...@gmail.com>, Sagar Rao <sagarmeansoc...@gmail.com>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-core.xml                 |   3 +-
 checkstyle/import-control.xml                      |   2 +
 core/src/main/scala/kafka/Kafka.scala              |  13 +-
 core/src/main/scala/kafka/admin/AclCommand.scala   |  44 ++--
 .../kafka/admin/BrokerApiVersionsCommand.scala     |   5 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  11 +-
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  64 +++---
 .../scala/kafka/admin/DelegationTokenCommand.scala |  23 ++-
 .../scala/kafka/admin/DeleteRecordsCommand.scala   |   6 +-
 .../scala/kafka/admin/LeaderElectionCommand.scala  |   6 +-
 .../main/scala/kafka/admin/LogDirsCommand.scala    |   7 +-
 .../kafka/admin/ReassignPartitionsCommand.scala    |  11 +-
 core/src/main/scala/kafka/admin/TopicCommand.scala |  46 +++--
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |   7 +-
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |  23 ++-
 .../main/scala/kafka/tools/ConsoleProducer.scala   |  13 +-
 .../scala/kafka/tools/ConsumerPerformance.scala    |   8 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   3 +-
 .../main/scala/kafka/tools/GetOffsetShell.scala    |   5 +-
 core/src/main/scala/kafka/tools/JmxTool.scala      |   8 +-
 core/src/main/scala/kafka/tools/MirrorMaker.scala  |   4 +-
 core/src/main/scala/kafka/tools/PerfConfig.scala   |   3 +-
 .../kafka/tools/ReplicaVerificationTool.scala      |   5 +-
 .../scala/kafka/tools/StateChangeLogMerger.scala   |   6 +-
 .../main/scala/kafka/tools/StreamsResetter.java    |  11 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   7 +-
 .../scala/kafka/utils/CommandDefaultOptions.scala  |  27 ---
 .../main/scala/kafka/utils/CommandLineUtils.scala  | 145 -------------
 core/src/main/scala/kafka/utils/ToolsUtils.scala   |  19 +-
 .../scala/kafka/tools/LogCompactionTester.scala    |   3 +-
 .../scala/other/kafka/TestLinearWriteSpeed.scala   |   1 +
 .../other/kafka/TestPurgatoryPerformance.scala     |   1 +
 .../admin/ReassignPartitionsCommandArgsTest.scala  |   2 +-
 .../unit/kafka/utils/CommandLineUtilsTest.scala    | 223 --------------------
 .../kafka/server/util/CommandDefaultOptions.java   |  41 ++++
 .../apache/kafka/server/util/CommandLineUtils.java | 197 ++++++++++++++++++
 .../kafka/server/util/CommandLineUtilsTest.java    | 227 +++++++++++++++++++++
 38 files changed, 664 insertions(+), 567 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8441ddeaf05..56883860573 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1536,6 +1536,7 @@ project(':server-common') {
     api project(':clients')
     implementation libs.slf4jApi
     implementation libs.metrics
+    implementation libs.joptSimple
 
     testImplementation project(':clients')
     testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index 5a92c44d7c8..0b15f958b96 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -60,8 +60,9 @@
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="kafka.admin" />
-    <allow pkg="joptsimple" />
     <allow pkg="org.apache.kafka.clients.consumer" />
+    <allow pkg="org.apache.kafka.server.util" />
+    <allow pkg="joptsimple" />
   </subpackage>
 
   <subpackage name="coordinator">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 8792c26c949..0e767baee5d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -347,6 +347,7 @@
 
   <subpackage name="server">
     <allow pkg="org.apache.kafka.common" />
+    <allow pkg="joptsimple" />
 
     <!-- This is required to make AlterConfigPolicyTest work. -->
     <allow pkg="org.apache.kafka.server.policy" />
@@ -406,6 +407,7 @@
     <allow pkg="net.sourceforge.argparse4j" />
     <allow pkg="org.apache.log4j" />
     <allow pkg="kafka.test" />
+    <allow pkg="joptsimple" />
   </subpackage>
 
   <subpackage name="trogdor">
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index dad462dcad6..fa0137e9597 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -22,10 +22,9 @@ import java.util.Properties
 import joptsimple.OptionParser
 import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
 import kafka.utils.Implicits._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
 import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, 
OperatingSystem, Time, Utils}
-
-import scala.jdk.CollectionConverters._
+import org.apache.kafka.server.util.CommandLineUtils
 
 object Kafka extends Logging {
 
@@ -41,12 +40,12 @@ object Kafka extends Logging {
     optionParser.accepts("version", "Print version information and exit.")
 
     if (args.isEmpty || args.contains("--help")) {
-      CommandLineUtils.printUsageAndDie(optionParser,
+      CommandLineUtils.printUsageAndExit(optionParser,
         "USAGE: java [options] %s server.properties [--override 
property=value]*".format(this.getClass.getCanonicalName.split('$').head))
     }
 
     if (args.contains("--version")) {
-      CommandLineUtils.printVersionAndDie()
+      CommandLineUtils.printVersionAndExit()
     }
 
     val props = Utils.loadProps(args(0))
@@ -55,10 +54,10 @@ object Kafka extends Logging {
       val options = optionParser.parse(args.slice(1, args.length): _*)
 
       if (options.nonOptionArguments().size() > 0) {
-        CommandLineUtils.printUsageAndDie(optionParser, "Found non argument 
parameters: " + options.nonOptionArguments().toArray.mkString(","))
+        CommandLineUtils.printUsageAndExit(optionParser, "Found non argument 
parameters: " + options.nonOptionArguments().toArray.mkString(","))
       }
 
-      props ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
+      props ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt))
     }
     props
   }
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 769e99df737..1c8b6537346 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -18,7 +18,6 @@
 package kafka.admin
 
 import java.util.Properties
-
 import joptsimple._
 import joptsimple.util.EnumConverter
 import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
@@ -33,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -51,7 +51,7 @@ object AclCommand extends Logging {
 
     val opts = new AclCommandOptions(args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manage 
acls on kafka.")
+    CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage 
acls on kafka.")
 
     opts.checkArgs()
 
@@ -202,8 +202,8 @@ object AclCommand extends Logging {
       val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> 
JaasUtils.isZkSaslEnabled)
       val authorizerPropertiesWithoutTls =
         if (opts.options.has(opts.authorizerPropertiesOpt)) {
-          val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
-          defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = 
false).asScala
+          val authorizerProperties = 
opts.options.valuesOf(opts.authorizerPropertiesOpt)
+          defaultProps ++ 
CommandLineUtils.parseKeyValueArgs(authorizerProperties, false).asScala
         } else {
           defaultProps
         }
@@ -324,7 +324,7 @@ object AclCommand extends Logging {
   private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, 
Set[AccessControlEntry]] = {
     val patternType = opts.options.valueOf(opts.resourcePatternType)
     if (!patternType.isSpecific)
-      CommandLineUtils.printUsageAndDie(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
+      CommandLineUtils.printUsageAndExit(opts.parser, s"A 
'--resource-pattern-type' value of '$patternType' is not valid when adding 
acls.")
 
     val resourceToAcl = getResourceFilterToAcls(opts).map {
       case (filter, acls) =>
@@ -332,7 +332,7 @@ object AclCommand extends Logging {
     }
 
     if (resourceToAcl.values.exists(_.isEmpty))
-      CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: 
--allow-principal, --deny-principal when trying to add ACLs.")
+      CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one 
of: --allow-principal, --deny-principal when trying to add ACLs.")
 
     resourceToAcl
   }
@@ -430,8 +430,8 @@ object AclCommand extends Logging {
     } yield new AccessControlEntry(principal.toString, host, operation, 
permissionType)
   }
 
-  private def getHosts(opts: AclCommandOptions, hostOptionSpec: 
ArgumentAcceptingOptionSpec[String],
-                       principalOptionSpec: 
ArgumentAcceptingOptionSpec[String]): Set[String] = {
+  private def getHosts(opts: AclCommandOptions, hostOptionSpec: 
OptionSpec[String],
+                       principalOptionSpec: OptionSpec[String]): Set[String] = 
{
     if (opts.options.has(hostOptionSpec))
       opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
     else if (opts.options.has(principalOptionSpec))
@@ -440,7 +440,7 @@ object AclCommand extends Logging {
       Set.empty[String]
   }
 
-  private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: 
ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
+  private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: 
OptionSpec[String]): Set[KafkaPrincipal] = {
     if (opts.options.has(principalOptionSpec))
       opts.options.valuesOf(principalOptionSpec).asScala.map(s => 
JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
     else
@@ -471,7 +471,7 @@ object AclCommand extends Logging {
       opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => 
resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, 
patternType))
 
     if (resourceFilters.isEmpty && dieIfNoResourceFound)
-      CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at 
least one resource: --topic <topic> or --cluster or --group <group> or 
--delegation-token <Delegation Token ID>")
+      CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at 
least one resource: --topic <topic> or --cluster or --group <group> or 
--delegation-token <Delegation Token ID>")
 
     resourceFilters
   }
@@ -487,7 +487,7 @@ object AclCommand extends Logging {
     for ((resource, acls) <- resourceToAcls) {
       val validOps = AclEntry.supportedOperations(resource.resourceType) + 
AclOperation.ALL
       if ((acls.map(_.operation) -- validOps).nonEmpty)
-        CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType 
${resource.resourceType} only supports operations ${validOps.mkString(",")}")
+        CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType 
${resource.resourceType} only supports operations ${validOps.mkString(",")}")
     }
   }
 
@@ -634,7 +634,7 @@ object AclCommand extends Logging {
 
     def checkArgs(): Unit = {
       if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
-        CommandLineUtils.printUsageAndDie(parser, "Only one of 
--bootstrap-server or --authorizer must be specified")
+        CommandLineUtils.printUsageAndExit(parser, "Only one of 
--bootstrap-server or --authorizer must be specified")
 
       if (!options.has(bootstrapServerOpt)) {
         CommandLineUtils.checkRequiredArgs(parser, options, 
authorizerPropertiesOpt)
@@ -642,32 +642,32 @@ object AclCommand extends Logging {
       }
 
       if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
-        CommandLineUtils.printUsageAndDie(parser, "The --command-config option 
can only be used with --bootstrap-server option")
+        CommandLineUtils.printUsageAndExit(parser, "The --command-config 
option can only be used with --bootstrap-server option")
 
       if (options.has(authorizerPropertiesOpt) && 
options.has(bootstrapServerOpt))
-        CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties 
option can only be used with --authorizer option")
+        CommandLineUtils.printUsageAndExit(parser, "The 
--authorizer-properties option can only be used with --authorizer option")
 
       val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
       if (actions != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Command must include 
exactly one action: --list, --add, --remove. ")
+        CommandLineUtils.printUsageAndExit(parser, "Command must include 
exactly one action: --list, --add, --remove. ")
 
-      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, 
Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, 
denyPrincipalsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, 
consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
 
       //when --producer or --consumer is specified , user should not specify 
operations as they are inferred and we also disallow --deny-principals and 
--deny-hosts.
-      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, 
Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
operationsOpt, denyPrincipalsOpt, denyHostsOpt)
 
       if (options.has(listPrincipalsOpt) && !options.has(listOpt))
-        CommandLineUtils.printUsageAndDie(parser, "The --principal option is 
only available if --list is set")
+        CommandLineUtils.printUsageAndExit(parser, "The --principal option is 
only available if --list is set")
 
       if (options.has(producerOpt) && !options.has(topicOpt))
-        CommandLineUtils.printUsageAndDie(parser, "With --producer you must 
specify a --topic")
+        CommandLineUtils.printUsageAndExit(parser, "With --producer you must 
specify a --topic")
 
       if (options.has(idempotentOpt) && !options.has(producerOpt))
-        CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is 
only available if --producer is set")
+        CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is 
only available if --producer is set")
 
       if (options.has(consumerOpt) && (!options.has(topicOpt) || 
!options.has(groupOpt) || (!options.has(producerOpt) && 
(options.has(clusterOpt) || options.has(transactionalIdOpt)))))
-        CommandLineUtils.printUsageAndDie(parser, "With --consumer you must 
specify a --topic and a --group and no --cluster or --transactional-id option 
should be specified.")
+        CommandLineUtils.printUsageAndExit(parser, "With --consumer you must 
specify a --topic and a --group and no --cluster or --transactional-id option 
should be specified.")
     }
   }
 }
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala 
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index ea6ffeea65b..990ac6f9448 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -22,8 +22,6 @@ import java.io.IOException
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
-
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
 import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Utils
@@ -42,6 +40,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
@@ -94,7 +93,7 @@ object BrokerApiVersionsCommand {
     checkArgs()
 
     def checkArgs(): Unit = {
-      CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to 
retrieve broker version information.")
+      CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
retrieve broker version information.")
       // check required args
       CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
     }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 687e65378b9..5d915f5e97f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -23,7 +23,7 @@ import java.util.{Collections, Properties}
 import joptsimple._
 import kafka.server.DynamicConfig.QuotaConfigs
 import kafka.server.{ConfigEntityName, ConfigType, Defaults, 
DynamicBrokerConfig, DynamicConfig, KafkaConfig}
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging, 
PasswordEncoder}
+import kafka.utils.{Exit, Logging, PasswordEncoder}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, 
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, 
ScramMechanism => PublicScramMechanism}
@@ -37,6 +37,7 @@ import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, 
ScramFormatter, ScramMechanism}
 import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
 import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.annotation.nowarn
@@ -84,7 +85,7 @@ object ConfigCommand extends Logging {
     try {
       val opts = new ConfigCommandOptions(args)
 
-      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to 
manipulate and describe entity config for a topic, client, user, broker or ip")
+      CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
manipulate and describe entity config for a topic, client, user, broker or ip")
 
       opts.checkArgs()
 
@@ -863,10 +864,10 @@ object ConfigCommand extends Logging {
       // should have exactly one action
       val actions = Seq(alterOpt, describeOpt).count(options.has _)
       if (actions != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Command must include 
exactly one action: --describe, --alter")
+        CommandLineUtils.printUsageAndExit(parser, "Command must include 
exactly one action: --describe, --alter")
       // check required args
-      CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, 
Set(describeOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, 
Set(alterOpt, addConfig, deleteConfig))
+      CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, 
alterOpt, addConfig, deleteConfig)
 
       val entityTypeVals = entityTypes
       if (entityTypeVals.size != entityTypeVals.distinct.size)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b428c447d77..9c0452b781c 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -28,18 +28,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Map, Seq, immutable, mutable}
 import scala.util.{Failure, Success, Try}
-import joptsimple.OptionSpec
+import joptsimple.{OptionException, OptionSpec}
 import org.apache.kafka.common.protocol.Errors
 
 import scala.collection.immutable.TreeMap
 import scala.reflect.ClassTag
 import org.apache.kafka.common.ConsumerGroupState
-import joptsimple.OptionException
 import org.apache.kafka.common.requests.ListOffsetsResponse
 
 object ConsumerGroupCommand extends Logging {
@@ -49,17 +49,17 @@ object ConsumerGroupCommand extends Logging {
     val opts = new ConsumerGroupCommandOptions(args)
     try {
       opts.checkArgs()
-      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list 
all consumer groups, describe a consumer group, delete consumer group info, or 
reset consumer group offsets.")
+      CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list 
all consumer groups, describe a consumer group, delete consumer group info, or 
reset consumer group offsets.")
 
       // should have exactly one action
       val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, 
opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
       if (actions != 1)
-        CommandLineUtils.printUsageAndDie(opts.parser, "Command must include 
exactly one action: --list, --describe, --delete, --reset-offsets, 
--delete-offsets")
+        CommandLineUtils.printUsageAndExit(opts.parser, "Command must include 
exactly one action: --list, --describe, --delete, --reset-offsets, 
--delete-offsets")
 
       run(opts)
     } catch {
       case e: OptionException =>
-        CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+        CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
     }
   }
 
@@ -85,7 +85,7 @@ object ConsumerGroupCommand extends Logging {
       }
     } catch {
       case e: IllegalArgumentException =>
-        CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+        CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
       case e: Throwable =>
         printError(s"Executing consumer group command failed due to 
${e.getMessage}", Some(e))
     } finally {
@@ -747,7 +747,7 @@ object ConsumerGroupCommand extends Logging {
         if (opts.options.has(opts.resetFromFileOpt))
           Nil
         else
-          CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset 
scopes should be defined: --all-topics, --topic.")
+          ToolsUtils.printUsageAndExit(opts.parser, "One of the reset scopes 
should be defined: --all-topics, --topic.")
       }
     }
 
@@ -801,7 +801,7 @@ object ConsumerGroupCommand extends Logging {
         partitionsToReset.map { topicPartition =>
           logStartOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting starting offset of topic partition: $topicPartition")
+            case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error 
getting starting offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetToLatestOpt)) {
@@ -809,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
         partitionsToReset.map { topicPartition =>
           logEndOffsets.get(topicPartition) match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting ending offset of topic partition: $topicPartition")
+            case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error 
getting ending offset of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetShiftByOpt)) {
@@ -830,7 +830,7 @@ object ConsumerGroupCommand extends Logging {
           val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting offset by timestamp of topic partition: $topicPartition")
+            case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error 
getting offset by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (opts.options.has(opts.resetByDurationOpt)) {
@@ -844,7 +844,7 @@ object ConsumerGroupCommand extends Logging {
           val logTimestampOffset = logTimestampOffsets.get(topicPartition)
           logTimestampOffset match {
             case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-            case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting offset by timestamp of topic partition: $topicPartition")
+            case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error 
getting offset by timestamp of topic partition: $topicPartition")
           }
         }.toMap
       } else if (resetPlanFromFile.isDefined) {
@@ -875,12 +875,12 @@ object ConsumerGroupCommand extends Logging {
 
         val preparedOffsetsForPartitionsWithoutCommittedOffset = 
getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
           case (topicPartition, LogOffsetResult.LogOffset(offset)) => 
(topicPartition, new OffsetAndMetadata(offset))
-          case (topicPartition, _) => 
CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of 
topic partition: $topicPartition")
+          case (topicPartition, _) => 
ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of 
topic partition: $topicPartition")
         }
 
         preparedOffsetsForPartitionsWithCommittedOffset ++ 
preparedOffsetsForPartitionsWithoutCommittedOffset
       } else {
-        CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires 
one of the following scenarios: %s".format(opts.resetOffsetsOpt, 
opts.allResetOffsetScenarioOpts) )
+        ToolsUtils.printUsageAndExit(opts.parser, "Option '%s' requires one of 
the following scenarios: %s".format(opts.resetOffsetsOpt, 
opts.allResetOffsetScenarioOpts))
       }
     }
 
@@ -1095,15 +1095,15 @@ object ConsumerGroupCommand extends Logging {
 
       if (options.has(describeOpt)) {
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $describeOpt takes one of these options: 
${allGroupSelectionScopeOpts.mkString(", ")}")
         val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, 
offsetsOpt, stateOpt)
         if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 
0).sum > 1) {
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $describeOpt takes at most one of these options: 
${mutuallyExclusiveOpts.mkString(", ")}")
         }
         if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $describeOpt does not take a value for $stateOpt")
       } else {
         if (options.has(timeoutMsOpt))
@@ -1112,22 +1112,22 @@ object ConsumerGroupCommand extends Logging {
 
       if (options.has(deleteOpt)) {
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $deleteOpt takes one of these options: 
${allGroupSelectionScopeOpts.mkString(", ")}")
         if (options.has(topicOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"The consumer does not 
support topic-specific offset " +
+          CommandLineUtils.printUsageAndExit(parser, s"The consumer does not 
support topic-specific offset " +
             "deletion from a consumer group.")
       }
 
       if (options.has(deleteOffsetsOpt)) {
         if (!options.has(groupOpt) || !options.has(topicOpt))
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $deleteOffsetsOpt takes the following options: 
${allDeleteOffsetsOpts.mkString(", ")}")
       }
 
       if (options.has(resetOffsetsOpt)) {
         if (options.has(dryRunOpt) && options.has(executeOpt))
-          CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt 
only accepts one of $executeOpt and $dryRunOpt")
+          CommandLineUtils.printUsageAndExit(parser, s"Option $resetOffsetsOpt 
only accepts one of $executeOpt and $dryRunOpt")
 
         if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
           Console.err.println("WARN: No action will be performed as the 
--execute option is missing." +
@@ -1137,21 +1137,21 @@ object ConsumerGroupCommand extends Logging {
         }
 
         if (!options.has(groupOpt) && !options.has(allGroupsOpt))
-          CommandLineUtils.printUsageAndDie(parser,
+          CommandLineUtils.printUsageAndExit(parser,
             s"Option $resetOffsetsOpt takes one of these options: 
${allGroupSelectionScopeOpts.mkString(", ")}")
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,   
allResetOffsetScenarioOpts - resetToOffsetOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, 
allResetOffsetScenarioOpts - resetToDatetimeOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, 
allResetOffsetScenarioOpts - resetByDurationOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, 
allResetOffsetScenarioOpts - resetToEarliestOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt,   
allResetOffsetScenarioOpts - resetToLatestOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt,  
allResetOffsetScenarioOpts - resetToCurrentOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt,    
allResetOffsetScenarioOpts - resetShiftByOpt)
-        CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt,   
allResetOffsetScenarioOpts - resetFromFileOpt)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt,   
(allResetOffsetScenarioOpts - resetToOffsetOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, 
(allResetOffsetScenarioOpts - resetToDatetimeOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, 
(allResetOffsetScenarioOpts - resetByDurationOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, 
(allResetOffsetScenarioOpts - resetToEarliestOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt,   
(allResetOffsetScenarioOpts - resetToLatestOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt,  
(allResetOffsetScenarioOpts - resetToCurrentOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt,    
(allResetOffsetScenarioOpts - resetShiftByOpt).asJava)
+        CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt,   
(allResetOffsetScenarioOpts - resetFromFileOpt).asJava)
       }
 
-      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
allGroupSelectionScopeOpts - groupOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
-      CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, 
allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
(allGroupSelectionScopeOpts - groupOpt).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
(allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, 
(allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt).asJava )
     }
   }
 }
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala 
b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 78984792ce2..7dce0d58515 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -20,17 +20,16 @@ package kafka.admin
 import java.text.SimpleDateFormat
 import java.util
 import java.util.Base64
-
 import joptsimple.ArgumentAcceptingOptionSpec
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, 
DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, 
RenewDelegationTokenOptions}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
-import scala.collection.Set
 
 /**
  * A command to manage delegation token.
@@ -40,12 +39,12 @@ object DelegationTokenCommand extends Logging {
   def main(args: Array[String]): Unit = {
     val opts = new DelegationTokenCommandOptions(args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to 
create, renew, expire, or describe delegation tokens.")
+    CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, 
renew, expire, or describe delegation tokens.")
 
     // should have exactly one action
     val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, 
opts.describeOpt).count(opts.options.has _)
     if(actions != 1)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include 
exactly one action: --create, --renew, --expire or --describe")
+      CommandLineUtils.printUsageAndExit(opts.parser, "Command must include 
exactly one action: --create, --renew, --expire or --describe")
 
     opts.checkArgs()
 
@@ -207,17 +206,19 @@ object DelegationTokenCommand extends Logging {
       if (options.has(createOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
 
-      if (options.has(renewOpt))
+      if (options.has(renewOpt)) {
         CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, 
renewTimePeriodOpt)
+      }
 
-      if (options.has(expiryOpt))
+      if (options.has(expiryOpt)) {
         CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, 
expiryTimePeriodOpt)
+      }
 
       // check invalid args
-      CommandLineUtils.checkInvalidArgs(parser, options, createOpt, 
Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, 
Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, 
ownerPrincipalsOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, 
Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, 
Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, 
expiryTimePeriodOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt, 
renewTimePeriodOpt, expiryTimePeriodOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, 
renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt, 
maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)
+      CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, 
renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, 
expiryTimePeriodOpt)
     }
   }
 }
diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala 
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 71ef6fd6f19..b1747313708 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -19,14 +19,14 @@ package kafka.admin
 
 import java.io.PrintStream
 import java.util.Properties
-
 import kafka.common.AdminCommandFailedException
 import kafka.utils.json.JsonValue
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
+import kafka.utils.{CoreUtils, Json}
 import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -130,7 +130,7 @@ object DeleteRecordsCommand {
 
     options = parser.parse(args : _*)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to delete 
records of the given partitions down to the specified offset.")
+    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete 
records of the given partitions down to the specified offset.")
 
     CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, 
offsetJsonFileOpt)
   }
diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
index 92edcad003f..140f4b70177 100644
--- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala
@@ -20,8 +20,6 @@ import java.util.Properties
 import java.util.concurrent.ExecutionException
 import joptsimple.util.EnumConverter
 import kafka.common.AdminCommandFailedException
-import kafka.utils.CommandDefaultOptions
-import kafka.utils.CommandLineUtils
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import kafka.utils.Json
@@ -33,6 +31,8 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException
 import org.apache.kafka.common.errors.ElectionNotNeededException
 import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
+
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 import scala.concurrent.duration._
@@ -44,7 +44,7 @@ object LeaderElectionCommand extends Logging {
 
   def run(args: Array[String], timeout: Duration): Unit = {
     val commandOptions = new LeaderElectionCommandOptions(args)
-    CommandLineUtils.printHelpAndExitIfNeeded(
+    CommandLineUtils.maybePrintHelpOrVersion(
       commandOptions,
       "This tool attempts to elect a new leader for a set of topic partitions. 
The type of elections supported are preferred replicas and unclean replicas."
     )
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala 
b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index d8c802e7d0e..870e6a17ba1 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -16,13 +16,12 @@
   */
 
 package kafka.admin
-
 import java.io.PrintStream
 import java.util.Properties
-
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
+import kafka.utils.Json
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
LogDirDescription}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Map
@@ -126,7 +125,7 @@ object LogDirsCommand {
 
         options = parser.parse(args : _*)
 
-        CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to 
query log directory usage on the specified brokers.")
+        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
query log directory usage on the specified brokers.")
 
         CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt, describeOpt)
     }
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2f3e3a69117..24c2fafb813 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -21,7 +21,7 @@ import java.util.Optional
 import java.util.concurrent.ExecutionException
 import kafka.common.AdminCommandFailedException
 import kafka.server.DynamicConfig
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, 
Json, Logging}
+import kafka.utils.{CoreUtils, Exit, Json, Logging}
 import kafka.utils.Implicits._
 import kafka.utils.json.JsonValue
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -31,6 +31,7 @@ import 
org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopi
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, 
TopicPartitionReplica}
 import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq, mutable}
@@ -1330,20 +1331,20 @@ object ReassignPartitionsCommand extends Logging {
   def validateAndParseArgs(args: Array[String]): 
ReassignPartitionsCommandOptions = {
     val opts = new ReassignPartitionsCommandOptions(args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, helpText)
+    CommandLineUtils.maybePrintHelpOrVersion(opts, helpText)
 
     // Determine which action we should perform.
     val validActions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt,
                            opts.cancelOpt, opts.listOpt)
     val allActions = validActions.filter(opts.options.has _)
     if (allActions.size != 1) {
-      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include 
exactly one action: %s".format(
+      CommandLineUtils.printUsageAndExit(opts.parser, "Command must include 
exactly one action: %s".format(
         validActions.map("--" + _.options().get(0)).mkString(", ")))
     }
     val action = allActions.head
 
     if (!opts.options.has(opts.bootstrapServerOpt))
-      CommandLineUtils.printUsageAndDie(opts.parser, "Please specify 
--bootstrap-server")
+      CommandLineUtils.printUsageAndExit(opts.parser, "Please specify 
--bootstrap-server")
 
     // Make sure that we have all the required arguments for our action.
     val requiredArgs = Map(
@@ -1400,7 +1401,7 @@ object ReassignPartitionsCommand extends Logging {
       if (!opt.equals(action) &&
         !requiredArgs(action).contains(opt) &&
         !permittedArgs(action).contains(opt)) {
-        CommandLineUtils.printUsageAndDie(opts.parser,
+        CommandLineUtils.printUsageAndExit(opts.parser,
           """Option "%s" can't be used with action "%s"""".format(opt, action))
       }
     })
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 65148b203f8..3d91b6256b4 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -18,7 +18,7 @@
 package kafka.admin
 
 import java.util
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import joptsimple._
 import kafka.common.AdminCommandFailedException
 import kafka.utils._
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExist
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.log.internals.LogConfig
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -605,53 +606,54 @@ object TopicCommand extends Logging {
 
     def checkArgs(): Unit = {
       if (args.isEmpty)
-        CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, 
or change a topic.")
+        CommandLineUtils.printUsageAndExit(parser, "Create, delete, describe, 
or change a topic.")
 
-      CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to 
create, delete, describe, or change a topic.")
+      CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
create, delete, describe, or change a topic.")
 
       // should have exactly one action
       val actions = Seq(createOpt, listOpt, alterOpt, describeOpt, 
deleteOpt).count(options.has)
       if (actions != 1)
-        CommandLineUtils.printUsageAndDie(parser, "Command must include 
exactly one action: --list, --describe, --create, --alter or --delete")
+        CommandLineUtils.printUsageAndExit(parser, "Command must include 
exactly one action: --list, --describe, --create, --alter or --delete")
 
       // check required args
       if (!has(bootstrapServerOpt))
         throw new IllegalArgumentException("--bootstrap-server must be 
specified")
       if (has(describeOpt) && has(ifExistsOpt)) {
         if (!has(topicOpt) && !has(topicIdOpt))
-          CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is 
required to describe a topic")
+          CommandLineUtils.printUsageAndExit(parser, "--topic or --topic-id is 
required to describe a topic")
         if (has(topicOpt) && has(topicIdOpt))
           println("Only topic id will be used when both --topic and --topic-id 
are specified and topicId is not Uuid.ZERO_UUID")
       }
       if (!has(listOpt) && !has(describeOpt))
         CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
       if (has(alterOpt)) {
-        CommandLineUtils.checkInvalidArgsSet(parser, options, 
Set(bootstrapServerOpt, configOpt), Set(alterOpt),
-        Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
+        val usedOptions = immutable.Set[OptionSpec[_]](bootstrapServerOpt, 
configOpt)
+        val invalidOptions = immutable.Set[OptionSpec[_]](alterOpt)
+        CommandLineUtils.checkInvalidArgsSet(parser, options, 
usedOptions.asJava, invalidOptions.asJava, 
Optional.of(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
         CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt)
       }
 
       // check invalid args
-      CommandLineUtils.checkInvalidArgs(parser, options, configOpt, 
allTopicLevelOpts -- Set(alterOpt, createOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, 
allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, 
allTopicLevelOpts -- Set(alterOpt, createOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, 
allTopicLevelOpts -- Set(createOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, 
allTopicLevelOpts -- Set(createOpt,alterOpt))
-      if(options.has(createOpt))
-        CommandLineUtils.checkInvalidArgs(parser, options, 
replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, configOpt, 
(allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, 
(allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, 
(allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, 
(allTopicLevelOpts -- Set(createOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, 
(allTopicLevelOpts -- Set(createOpt,alterOpt)).asJava)
+      if (options.has(createOpt))
+        CommandLineUtils.checkInvalidArgs(parser, options, 
replicaAssignmentOpt, partitionsOpt, replicationFactorOpt)
       CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnderReplicatedPartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
+        (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt).asJava)
       CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnderMinIsrPartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt)
+        (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
       CommandLineUtils.checkInvalidArgs(parser, options, 
reportAtMinIsrPartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt)
+        (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
       CommandLineUtils.checkInvalidArgs(parser, options, 
reportUnavailablePartitionsOpt,
-        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
+        (allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - 
reportUnavailablePartitionsOpt + topicsWithOverridesOpt).asJava)
       CommandLineUtils.checkInvalidArgs(parser, options, 
topicsWithOverridesOpt,
-        allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
-      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, 
allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, 
allTopicLevelOpts -- Set(createOpt))
-      CommandLineUtils.checkInvalidArgs(parser, options, 
excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
+        (allTopicLevelOpts -- Set(describeOpt) ++ 
allReplicationReportOpts).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, 
(allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, 
(allTopicLevelOpts -- Set(createOpt)).asJava)
+      CommandLineUtils.checkInvalidArgs(parser, options, 
excludeInternalTopicOpt, (allTopicLevelOpts -- Set(listOpt, 
describeOpt)).asJava)
     }
   }
 }
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index 126319503b5..7bc7339d548 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -19,11 +19,12 @@ package kafka.admin
 
 import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
 import kafka.server.KafkaConfig
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging, ToolsUtils}
 import kafka.utils.Implicits._
 import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, 
ZkSecurityMigratorUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
@@ -70,7 +71,7 @@ object ZkSecurityMigrator extends Logging {
     val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     val opts = new ZkSecurityMigratorOptions(args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage)
+    CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage)
 
     // Must have either SASL or TLS mutual authentication enabled to use this 
tool.
     // Instantiate the client config we will use so that we take into account 
config provided via the CLI option
@@ -99,7 +100,7 @@ object ZkSecurityMigrator extends Logging {
         info("zookeeper.acl option is unsecure")
         false
       case _ =>
-        CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
+        ToolsUtils.printUsageAndExit(opts.parser, usageMessage)
     }
     val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
     val zkSessionTimeout = 
opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a70ce920e8e..2774f13397a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
Deserializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 
@@ -293,7 +294,7 @@ object ConsoleConsumer extends Logging {
 
     options = tryParse(parser, args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read 
data from Kafka topics and outputs it to standard output.")
+    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read 
data from Kafka topics and outputs it to standard output.")
 
     var groupIdPassed = true
     val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
@@ -302,7 +303,7 @@ object ConsoleConsumer extends Logging {
     var topicArg: String = _
     var includedTopicsArg: String = _
     var filterSpec: TopicFilter = _
-    val extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
+    val extraConsumerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt))
     val consumerProps = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else
@@ -315,7 +316,7 @@ object ConsoleConsumer extends Logging {
       Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
     else
       new Properties()
-    formatterArgs ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
+    formatterArgs ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
     val maxMessages = if (options.has(maxMessagesOpt)) 
options.valueOf(maxMessagesOpt).intValue else -1
     val timeoutMs = if (options.has(timeoutMsOpt)) 
options.valueOf(timeoutMsOpt).intValue else -1
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
@@ -341,19 +342,19 @@ object ConsoleConsumer extends Logging {
     val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == 
null)
     // user need to specify value for either --topic or one of the include 
filters options (--include or --whitelist)
     if (topicOrFilterArgs.size != 1)
-      CommandLineUtils.printUsageAndDie(parser, s"Exactly one of 
--include/--topic is required. " +
+      CommandLineUtils.printUsageAndExit(parser, s"Exactly one of 
--include/--topic is required. " +
         s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use 
--include instead; ignored if --include specified."}")
 
     if (partitionArg.isDefined) {
       if (!options.has(topicOpt))
-        CommandLineUtils.printUsageAndDie(parser, "The topic is required when 
partition is specified.")
+        CommandLineUtils.printUsageAndExit(parser, "The topic is required when 
partition is specified.")
       if (fromBeginning && options.has(offsetOpt))
-        CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and 
offset cannot be specified together.")
+        CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and 
offset cannot be specified together.")
     } else if (options.has(offsetOpt))
-      CommandLineUtils.printUsageAndDie(parser, "The partition is required 
when offset is specified.")
+      CommandLineUtils.printUsageAndExit(parser, "The partition is required 
when offset is specified.")
 
     def invalidOffset(offset: String): Nothing =
-      CommandLineUtils.printUsageAndDie(parser, s"The provided offset value 
'$offset' is incorrect. Valid values are " +
+      ToolsUtils.printUsageAndExit(parser, s"The provided offset value 
'$offset' is incorrect. Valid values are " +
         "'earliest', 'latest', or a non-negative long.")
 
     val offsetArg =
@@ -385,7 +386,7 @@ object ConsoleConsumer extends Logging {
     ).flatten
 
     if (groupIdsProvided.size > 1) {
-      CommandLineUtils.printUsageAndDie(parser, "The group ids provided in 
different places (directly using '--group', "
+      CommandLineUtils.printUsageAndExit(parser, "The group ids provided in 
different places (directly using '--group', "
         + "via '--consumer-property', or via '--consumer.config') do not 
match. "
         + s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", 
"'")}")
     }
@@ -403,14 +404,14 @@ object ConsoleConsumer extends Logging {
     }
 
     if (groupIdPassed && partitionArg.isDefined)
-      CommandLineUtils.printUsageAndDie(parser, "Options group and partition 
cannot be specified together.")
+      CommandLineUtils.printUsageAndExit(parser, "Options group and partition 
cannot be specified together.")
 
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
         parser.parse(args: _*)
       catch {
         case e: OptionException =>
-          CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+          ToolsUtils.printUsageAndExit(parser, e.getMessage)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 668a709d60e..245212471d0 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -24,14 +24,13 @@ import java.util.regex.Pattern
 import joptsimple.{OptionException, OptionParser, OptionSet}
 import kafka.common.MessageReader
 import kafka.utils.Implicits._
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.{Exit, ToolsUtils}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.utils.Utils
-
-import scala.jdk.CollectionConverters._
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 object ConsoleProducer {
 
@@ -260,7 +259,7 @@ object ConsoleProducer {
 
     options = tryParse(parser, args)
 
-    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read 
data from standard input and publish it to Kafka.")
+    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read 
data from standard input and publish it to Kafka.")
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
 
@@ -280,15 +279,15 @@ object ConsoleProducer {
                              else compressionCodecOptionValue
                            else CompressionType.NONE.name
     val readerClass = options.valueOf(messageReaderOpt)
-    val cmdLineProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
-    val extraProducerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
+    val cmdLineProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
+    val extraProducerProps = 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
 
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
         parser.parse(args: _*)
       catch {
         case e: OptionException =>
-          CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+          ToolsUtils.printUsageAndExit(parser, e.getMessage)
       }
     }
   }
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 89428e5663b..56f49456705 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -22,14 +22,14 @@ import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{Properties, Random}
-
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple.OptionException
-import kafka.utils.{CommandLineUtils, ToolsUtils}
+import kafka.utils.ToolsUtils
 import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, 
KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.server.util.CommandLineUtils
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -260,13 +260,13 @@ object ConsumerPerformance extends LazyLogging {
       options = parser.parse(args: _*)
     catch {
       case e: OptionException =>
-        CommandLineUtils.printUsageAndDie(parser, e.getMessage)
+        CommandLineUtils.printUsageAndExit(parser, e.getMessage)
     }
 
     if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
       println("WARNING: option [threads] and [num-fetch-threads] have been 
deprecated and will be ignored by the test")
 
-    CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in 
performance test for the full zookeeper consumer")
+    CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in 
performance test for the full zookeeper consumer")
 
     CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
numMessagesOpt)
 
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index ec0fb7d2e73..09d66cb2197 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
 import org.apache.kafka.server.log.internals.{CorruptSnapshotException, 
OffsetIndex, TimeIndex, TransactionIndex}
 import org.apache.kafka.snapshot.Snapshots
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
@@ -46,7 +47,7 @@ object DumpLogSegments {
 
   def main(args: Array[String]): Unit = {
     val opts = new DumpLogSegmentsOptions(args)
-    CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse 
a log file and dump its contents to the console, useful for debugging a 
seemingly corrupt log segment.")
+    CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to parse a 
log file and dump its contents to the console, useful for debugging a seemingly 
corrupt log segment.")
     opts.checkArgs()
 
     val misMatchesForIndexFilesMap = mutable.Map[String, List[(Long, Long)]]()
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala 
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index a8fd87cbe87..379b92218c8 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -19,11 +19,12 @@
 package kafka.tools
 
 import joptsimple._
-import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
+import kafka.utils.{Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
ListTopicsOptions, OffsetSpec}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.util.CommandLineUtils
 
 import java.util.Properties
 import java.util.concurrent.ExecutionException
@@ -82,7 +83,7 @@ object GetOffsetShell {
     val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", 
s"By default, internal topics are included. If specified, internal topics are 
excluded.")
 
     if (args.isEmpty)
-      CommandLineUtils.printUsageAndDie(parser, "An interactive shell for 
getting topic-partition offsets.")
+      CommandLineUtils.printUsageAndExit(parser, "An interactive shell for 
getting topic-partition offsets.")
 
     val options = parser.parse(args : _*)
 
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala 
b/core/src/main/scala/kafka/tools/JmxTool.scala
index c0f6d4a5ead..223c459bcc2 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -23,13 +23,13 @@ import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
 import javax.rmi.ssl.SslRMIClientSocketFactory
-
 import joptsimple.OptionParser
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 import scala.math._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{Exit, Logging}
+import org.apache.kafka.server.util.CommandLineUtils
 
 
 /**
@@ -99,7 +99,7 @@ object JmxTool extends Logging {
 
 
     if(args.isEmpty)
-      CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard 
output.")
+      CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard 
output.")
 
     val options = parser.parse(args : _*)
 
@@ -207,7 +207,7 @@ object JmxTool extends Logging {
       }
 
     if(numExpectedAttributes.isEmpty) {
-      CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for 
the queried objects $queries.")
+      CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for 
the queried objects $queries.")
     }
 
     // print csv header
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 75c2d144b02..067e8aee84c 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, 
AtomicInteger}
 import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
 import java.util.{Collections, Properties}
-
 import kafka.consumer.BaseConsumerRecord
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
@@ -35,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable.HashMap
@@ -86,7 +86,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     info("Starting mirror maker")
     try {
       val opts = new MirrorMakerOptions(args)
-      CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to 
continuously copy data between two Kafka clusters.")
+      CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
continuously copy data between two Kafka clusters.")
       opts.checkArgs()
     } catch {
       case ct: ControlThrowable => throw ct
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala 
b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 836163c85fa..6857e401e80 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -17,8 +17,7 @@
 
 package kafka.tools
 
-import kafka.utils.CommandDefaultOptions
-
+import org.apache.kafka.server.util.CommandDefaultOptions
 
 class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
   val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of 
messages to send or consume")
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 599384530b7..746cd1410fe 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, 
FetchResponse, ListOffsetsRequest}
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.server.util.CommandLineUtils
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 
@@ -114,11 +115,11 @@ object ReplicaVerificationTool extends Logging {
     val options = parser.parse(args: _*)
 
     if (args.isEmpty || options.has(helpOpt)) {
-      CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas 
for a set of topics have the same data.")
+      CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas 
for a set of topics have the same data.")
     }
 
     if (options.has(versionOpt)) {
-      CommandLineUtils.printVersionAndDie()
+      CommandLineUtils.printVersionAndExit()
     }
     CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)
 
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala 
b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 32e43a6606d..8986d8c8ff0 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -23,12 +23,12 @@ import scala.util.matching.Regex
 import collection.mutable
 import java.util.Date
 import java.text.SimpleDateFormat
+import kafka.utils.{CoreUtils, Exit, Logging}
 
-import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
 import java.io.{BufferedOutputStream, OutputStream}
 import java.nio.charset.StandardCharsets
-
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.server.util.CommandLineUtils
 
 /**
  * A utility that merges the state change logs (possibly obtained from 
different brokers and over multiple days).
@@ -89,7 +89,7 @@ object StateChangeLogMerger extends Logging {
                               .defaultsTo("9999-12-31 23:59:59,999")
                               
     if(args.isEmpty)
-      CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log 
files from several brokers to reconnstruct a unified history of what happened.")
+      CommandLineUtils.printUsageAndExit(parser, "A tool for merging the log 
files from several brokers to reconnstruct a unified history of what happened.")
 
 
     val options = parser.parse(args : _*)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 99c0f789cd9..1b52b2d2ec0 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -21,7 +21,6 @@ import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import joptsimple.OptionSpecBuilder;
-import kafka.utils.CommandLineUtils;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
@@ -39,7 +38,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
-import scala.collection.JavaConverters;
+import org.apache.kafka.server.util.CommandLineUtils;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -282,13 +281,13 @@ public class StreamsResetter {
         try {
             options = optionParser.parse(args);
             if (args.length == 0 || options.has(helpOption)) {
-                CommandLineUtils.printUsageAndDie(optionParser, USAGE);
+                CommandLineUtils.printUsageAndExit(optionParser, USAGE);
             }
             if (options.has(versionOption)) {
-                CommandLineUtils.printVersionAndDie();
+                CommandLineUtils.printVersionAndExit();
             }
         } catch (final OptionException e) {
-            CommandLineUtils.printUsageAndDie(optionParser, e.getMessage());
+            CommandLineUtils.printUsageAndExit(optionParser, e.getMessage());
         }
 
         final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
@@ -319,7 +318,7 @@ public class StreamsResetter {
             optionParser,
             options,
             option,
-            JavaConverters.asScalaSetConverter(invalidOptions).asScala());
+            invalidOptions);
     }
 
     private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final 
Map<Object, Object> consumerConfig,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 47ba5a747e1..0c31b4187a0 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -24,7 +24,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.{KafkaRaftManager, RaftManager}
 import kafka.security.CredentialProvider
 import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, 
SimpleApiVersionManager}
-import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, 
Logging, ShutdownableThread}
+import kafka.utils.{CoreUtils, Exit, Logging, ShutdownableThread}
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
@@ -39,6 +39,7 @@ import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, 
RaftConfig}
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.fault.ProcessExitingFaultHandler
+import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
 import org.apache.kafka.snapshot.SnapshotReader
 
 import scala.jdk.CollectionConverters._
@@ -435,7 +436,7 @@ object TestRaftServer extends Logging {
   def main(args: Array[String]): Unit = {
     val opts = new TestRaftServerOptions(args)
     try {
-      CommandLineUtils.printHelpAndExitIfNeeded(opts,
+      CommandLineUtils.maybePrintHelpOrVersion(opts,
         "Standalone raft server for performance testing")
 
       val configFile = opts.options.valueOf(opts.configOpt)
@@ -460,7 +461,7 @@ object TestRaftServer extends Logging {
       Exit.exit(0)
     } catch {
       case e: OptionException =>
-        CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
+        CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
       case e: Throwable =>
         fatal("Exiting raft server due to fatal exception", e)
         Exit.exit(1)
diff --git a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala 
b/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
deleted file mode 100644
index 2cdb408b4bb..00000000000
--- a/core/src/main/scala/kafka/utils/CommandDefaultOptions.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.utils
-
-import joptsimple.{OptionParser, OptionSet}
-
-abstract class CommandDefaultOptions(val args: Array[String], 
allowCommandOptionAbbreviation: Boolean = false) {
-  val parser = new OptionParser(allowCommandOptionAbbreviation)
-  val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
-  val versionOpt = parser.accepts("version", "Display Kafka 
version.").forHelp()
-  var options: OptionSet = _
-}
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala 
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
deleted file mode 100644
index e9dcee0eb18..00000000000
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.utils
-
-import java.util.Properties
-
-import joptsimple.{OptionParser, OptionSet, OptionSpec}
-
-import scala.collection.Set
-
-/**
-  * Helper functions for dealing with command line utilities
-  */
-object CommandLineUtils extends Logging {
-  /**
-    * Check if there are no options or `--help` option from command line
-    *
-    * @param commandOpts Acceptable options for a command
-    * @return true on matching the help check condition
-    */
-  def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
-    commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt)
-  }
-
-  def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = {
-    commandOpts.options.has(commandOpts.versionOpt)
-  }
-
-  /**
-    * Check and print help message if there is no options or `--help` option
-    * from command line, if `--version` is specified on the command line
-    * print version information and exit.
-    * NOTE: The function name is not strictly speaking correct anymore
-    * as it also checks whether the version needs to be printed, but
-    * refactoring this would have meant changing all command line tools
-    * and unnecessarily increased the blast radius of this change.
-    *
-    * @param commandOpts Acceptable options for a command
-    * @param message     Message to display on successful check
-    */
-  def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: 
String): Unit = {
-    if (isPrintHelpNeeded(commandOpts))
-      printUsageAndDie(commandOpts.parser, message)
-    if (isPrintVersionNeeded(commandOpts))
-      printVersionAndDie()
-  }
-
-  /**
-   * Check that all the listed options are present
-   */
-  def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: 
OptionSpec[_]*): Unit = {
-    for (arg <- required) {
-      if (!options.has(arg))
-        printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
-    }
-  }
-
-  /**
-   * Check that none of the listed options are present
-   */
-  def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: 
OptionSpec[_], invalidOptions: Set[OptionSpec[_]]): Unit = {
-    if (options.has(usedOption)) {
-      for (arg <- invalidOptions) {
-        if (options.has(arg))
-          printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be 
used with option \"" + arg + "\"")
-      }
-    }
-  }
-
-  /**
-    * Check that none of the listed options are present with the combination 
of used options
-    */
-  def checkInvalidArgsSet(parser: OptionParser, options: OptionSet, 
usedOptions: Set[OptionSpec[_]], invalidOptions: Set[OptionSpec[_]],
-                         trailingAdditionalMessage: Option[String] = None): 
Unit = {
-    if (usedOptions.count(options.has) == usedOptions.size) {
-      for (arg <- invalidOptions) {
-        if (options.has(arg))
-          printUsageAndDie(parser, "Option combination \"" + 
usedOptions.mkString(",") + "\" can't be used with option \"" + arg + "\"" + 
trailingAdditionalMessage.getOrElse(""))
-      }
-    }
-  }
-
-  /**
-   * Print usage and exit
-   */
-  def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
-    System.err.println(message)
-    parser.printHelpOn(System.err)
-    Exit.exit(1, Some(message))
-  }
-
-  def printVersionAndDie(): Nothing = {
-    System.out.println(VersionInfo.getVersionString)
-    Exit.exit(0)
-  }
-
-  /**
-   * Parse key-value pairs in the form key=value
-   * value may contain equals sign
-   */
-  def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = 
true): Properties = {
-    val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty)
-
-    val props = new Properties
-    for (a <- splits) {
-      if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
-        if (acceptMissingValue) props.put(a(0), "")
-        else throw new IllegalArgumentException(s"Missing value for key 
${a(0)}")
-      }
-      else props.put(a(0), a(1))
-    }
-    props
-  }
-
-  /**
-    * Merge the options into {@code props} for key {@code key}, with the 
following precedence, from high to low:
-    * 1) if {@code spec} is specified on {@code options} explicitly, use the 
value;
-    * 2) if {@code props} already has {@code key} set, keep it;
-    * 3) otherwise, use the default value of {@code spec}.
-    * A {@code null} value means to remove {@code key} from the {@code props}.
-    */
-  def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet, 
spec: OptionSpec[V]): Unit = {
-    if (options.has(spec) || !props.containsKey(key)) {
-      val value = options.valueOf(spec)
-      if (value == null)
-        props.remove(key)
-      else
-        props.put(key, value.toString)
-    }
-  }
-}
diff --git a/core/src/main/scala/kafka/utils/ToolsUtils.scala 
b/core/src/main/scala/kafka/utils/ToolsUtils.scala
index 056545cb031..10586317f65 100644
--- a/core/src/main/scala/kafka/utils/ToolsUtils.scala
+++ b/core/src/main/scala/kafka/utils/ToolsUtils.scala
@@ -18,6 +18,7 @@ package kafka.utils
 
 import joptsimple.OptionParser
 import org.apache.kafka.common.{Metric, MetricName}
+import org.apache.kafka.server.util.CommandLineUtils
 
 import scala.collection.mutable
 
@@ -32,8 +33,8 @@ object ToolsUtils {
       org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
     }
     val isValid = !validHostPort.isEmpty && validHostPort.size == 
hostPorts.length
-    if(!isValid)
-      CommandLineUtils.printUsageAndDie(parser, "Please provide valid 
host:port like host1:9091,host2:9092\n ")
+    if (!isValid)
+      CommandLineUtils.printUsageAndExit(parser, "Please provide valid 
host:port like host1:9091,host2:9092\n ")
   }
 
   /**
@@ -64,4 +65,18 @@ object ToolsUtils {
         println(s"%-${maxLengthOfDisplayName}s : 
$specifier".format(metricName, value))
     }
   }
+
+  /**
+   * This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
+   * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
+   * Can be removed once [[kafka.admin.ConsumerGroupCommand]], 
[[kafka.tools.ConsoleConsumer]]
+   * and [[kafka.tools.ConsoleProducer]] are migrated.
+   *
+   * @param parser Command line options parser.
+   * @param message Error message.
+   */
+  def printUsageAndExit(parser: OptionParser, message: String): Nothing = {
+    CommandLineUtils.printUsageAndExit(parser, message)
+    throw new AssertionError("printUsageAndExit should not return, but it 
did.")
+  }
 }
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala 
b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
index 1b3b9f5b18f..141a68396dd 100755
--- a/core/src/test/scala/kafka/tools/LogCompactionTester.scala
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerConfig, Produce
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.serialization.{ByteArraySerializer, 
StringDeserializer}
 import org.apache.kafka.common.utils.{AbstractIterator, Utils}
+import org.apache.kafka.server.util.CommandLineUtils
 
 import scala.jdk.CollectionConverters._
 
@@ -98,7 +99,7 @@ object LogCompactionTester {
     val options = parser.parse(args: _*)
 
     if (args.isEmpty)
-      CommandLineUtils.printUsageAndDie(parser, "A tool to test log 
compaction. Valid options are: ")
+      CommandLineUtils.printUsageAndExit(parser, "A tool to test log 
compaction. Valid options are: ")
 
     CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, 
numMessagesOpt)
 
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 
b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 4b930374e38..7cc7d5254e4 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel}
 import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
+import org.apache.kafka.server.util.CommandLineUtils
 
 import scala.math._
 
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala 
b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index 507c3ff0973..edb75620eeb 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -26,6 +26,7 @@ import joptsimple._
 import kafka.server.{DelayedOperation, DelayedOperationPurgatory}
 import kafka.utils._
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.util.CommandLineUtils
 
 import scala.math._
 import scala.jdk.CollectionConverters._
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index 13fc262ec4e..98d54629f20 100644
--- 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++ 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -216,7 +216,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldPrintHelpTextIfHelpArg(): Unit = {
     val args: Array[String]= Array("--help")
-    // note, this is not actually a failed case, it's just we share the same 
`printUsageAndDie` method when wrong arg received
+    // note, this is not actually a failed case, it's just we share the same 
`printUsageAndExit` method when wrong arg received
     shouldFailWith(ReassignPartitionsCommand.helpText, args)
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
deleted file mode 100644
index 8b528ed179b..00000000000
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import java.util.Properties
-
-import joptsimple.{OptionParser, OptionSpec}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-class CommandLineUtilsTest {
-
-
-  @Test
-  def testParseEmptyArg(): Unit = {
-    val argArray = Array("my.empty.property=")
-
-    assertThrows(classOf[java.lang.IllegalArgumentException], () => 
CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
-  }
-
-  @Test
-  def testParseEmptyArgWithNoDelimiter(): Unit = {
-    val argArray = Array("my.empty.property")
-
-    assertThrows(classOf[java.lang.IllegalArgumentException], () => 
CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
-  }
-
-  @Test
-  def testParseEmptyArgAsValid(): Unit = {
-    val argArray = Array("my.empty.property=", "my.empty.property1")
-    val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
-    assertEquals(props.getProperty("my.empty.property"), "", "Value of a key 
with missing value should be an empty string")
-    assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key 
with missing value with no delimiter should be an empty string")
-  }
-
-  @Test
-  def testParseSingleArg(): Unit = {
-    val argArray = Array("my.property=value")
-    val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
-    assertEquals(props.getProperty("my.property"), "value", "Value of a single 
property should be 'value' ")
-  }
-
-  @Test
-  def testParseArgs(): Unit = {
-    val argArray = Array("first.property=first","second.property=second")
-    val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
-    assertEquals(props.getProperty("first.property"), "first", "Value of first 
property should be 'first'")
-    assertEquals(props.getProperty("second.property"), "second", "Value of 
second property should be 'second'")
-  }
-
-  @Test
-  def testParseArgsWithMultipleDelimiters(): Unit = {
-    val argArray = Array("first.property==first", "second.property=second=", 
"third.property=thi=rd")
-    val props = CommandLineUtils.parseKeyValueArgs(argArray)
-
-    assertEquals(props.getProperty("first.property"), "=first", "Value of 
first property should be '=first'")
-    assertEquals(props.getProperty("second.property"), "second=", "Value of 
second property should be 'second='")
-    assertEquals(props.getProperty("third.property"), "thi=rd", "Value of 
second property should be 'thi=rd'")
-  }
-
-  val props = new Properties()
-  val parser = new OptionParser(false)
-  var stringOpt : OptionSpec[String] = _
-  var intOpt : OptionSpec[java.lang.Integer] = _
-  var stringOptOptionalArg : OptionSpec[String] = _
-  var intOptOptionalArg : OptionSpec[java.lang.Integer] = _
-  var stringOptOptionalArgNoDefault : OptionSpec[String] = _
-  var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _
-
-  def setUpOptions(): Unit = {
-    stringOpt = parser.accepts("str")
-      .withRequiredArg
-      .ofType(classOf[String])
-      .defaultsTo("default-string")
-    intOpt = parser.accepts("int")
-      .withRequiredArg()
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(100)
-    stringOptOptionalArg = parser.accepts("str-opt")
-      .withOptionalArg
-      .ofType(classOf[String])
-      .defaultsTo("default-string-2")
-    intOptOptionalArg = parser.accepts("int-opt")
-      .withOptionalArg
-      .ofType(classOf[java.lang.Integer])
-      .defaultsTo(200)
-    stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
-      .withOptionalArg
-      .ofType(classOf[String])
-    intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
-      .withOptionalArg
-      .ofType(classOf[java.lang.Integer])
-  }
-
-  @Test
-  def testMaybeMergeOptionsOverwriteExisting(): Unit = {
-    setUpOptions()
-
-    props.put("skey", "existing-string")
-    props.put("ikey", "300")
-    props.put("sokey", "existing-string-2")
-    props.put("iokey", "400")
-    props.put("sondkey", "existing-string-3")
-    props.put("iondkey", "500")
-
-    val options = parser.parse(
-      "--str", "some-string",
-      "--int", "600",
-      "--str-opt", "some-string-2",
-      "--int-opt", "700",
-      "--str-opt-nodef", "some-string-3",
-      "--int-opt-nodef", "800"
-    )
-
-    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
-    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
-    CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault)
-    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault)
-
-    assertEquals("some-string", props.get("skey"))
-    assertEquals("600", props.get("ikey"))
-    assertEquals("some-string-2", props.get("sokey"))
-    assertEquals("700", props.get("iokey"))
-    assertEquals("some-string-3", props.get("sondkey"))
-    assertEquals("800", props.get("iondkey"))
-  }
-
-  @Test
-  def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = {
-    setUpOptions()
-
-    props.put("sokey", "existing-string")
-    props.put("iokey", "300")
-    props.put("sondkey", "existing-string-2")
-    props.put("iondkey", "400")
-
-    val options = parser.parse(
-      "--str-opt",
-      "--int-opt",
-      "--str-opt-nodef",
-      "--int-opt-nodef"
-    )
-
-    CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault)
-    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault)
-
-    assertEquals("default-string-2", props.get("sokey"))
-    assertEquals("200", props.get("iokey"))
-    assertNull(props.get("sondkey"))
-    assertNull(props.get("iondkey"))
-  }
-
-  @Test
-  def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = {
-    setUpOptions()
-
-    val options = parser.parse()
-
-    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
-    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
-    CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault)
-    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault)
-
-    assertEquals("default-string", props.get("skey"))
-    assertEquals("100", props.get("ikey"))
-    assertEquals("default-string-2", props.get("sokey"))
-    assertEquals("200", props.get("iokey"))
-    assertNull(props.get("sondkey"))
-    assertNull(props.get("iondkey"))
-  }
-
-  @Test
-  def testMaybeMergeOptionsNotOverwriteExisting(): Unit = {
-    setUpOptions()
-
-    props.put("skey", "existing-string")
-    props.put("ikey", "300")
-    props.put("sokey", "existing-string-2")
-    props.put("iokey", "400")
-    props.put("sondkey", "existing-string-3")
-    props.put("iondkey", "500")
-
-    val options = parser.parse()
-
-    CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
-    CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
-    CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg)
-    CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault)
-    CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault)
-
-    assertEquals("existing-string", props.get("skey"))
-    assertEquals("300", props.get("ikey"))
-    assertEquals("existing-string-2", props.get("sokey"))
-    assertEquals("400", props.get("iokey"))
-    assertEquals("existing-string-3", props.get("sondkey"))
-    assertEquals("500", props.get("iondkey"))
-  }
-}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
new file mode 100644
index 00000000000..10bbd0becdd
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandDefaultOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+
+public abstract class CommandDefaultOptions {
+    public final String[] args;
+    public final OptionParser parser;
+    public final AbstractOptionSpec<Void> helpOpt;
+    public final AbstractOptionSpec<Void> versionOpt;
+    public OptionSet options;
+
+    public CommandDefaultOptions(String[] args) {
+        this(args, false);
+    }
+
+    public CommandDefaultOptions(String[] args, boolean 
allowCommandOptionAbbreviation) {
+        this.args = args;
+        this.parser = new OptionParser(allowCommandOptionAbbreviation);
+        this.helpOpt = parser.accepts("help", "Print usage 
information.").forHelp();
+        this.versionOpt = parser.accepts("version", "Display Kafka 
version.").forHelp();
+        this.options = null;
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
new file mode 100644
index 00000000000..0f904127dcd
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+    /**
+     * Check if there are no options or `--help` option from command line.
+     *
+     * @param commandOpts Acceptable options for a command
+     * @return true on matching the help check condition
+     */
+    public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+        return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+    }
+
+    /**
+     * Check if there is `--version` option from command line.
+     *
+     * @param commandOpts Acceptable options for a command
+     * @return true on matching the help check condition
+     */
+    public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+        return commandOpts.options.has(commandOpts.versionOpt);
+    }
+
+    /**
+     * Check and print help message if there is no options or `--help` option
+     * from command line, if `--version` is specified on the command line
+     * print version information and exit.
+     *
+     * @param commandOpts Acceptable options for a command
+     * @param message     Message to display on successful check
+     */
+    public static void maybePrintHelpOrVersion(CommandDefaultOptions 
commandOpts, String message) {
+        if (isPrintHelpNeeded(commandOpts)) {
+            printUsageAndExit(commandOpts.parser, message);
+        }
+        if (isPrintVersionNeeded(commandOpts)) {
+            printVersionAndExit();
+        }
+    }
+
+    /**
+     * Check that all the listed options are present.
+     */
+    public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec<?>... requiredList) {
+        for (OptionSpec<?> arg : requiredList) {
+            if (!options.has(arg)) {
+                printUsageAndExit(parser, String.format("Missing required 
argument \"%s\"", arg));
+            }
+        }
+    }
+
+    /**
+     * Check that none of the listed options are present.
+     */
+    public static void checkInvalidArgs(OptionParser parser,
+                                        OptionSet options,
+                                        OptionSpec<?> usedOption,
+                                        OptionSpec<?>... invalidOptions) {
+        if (options.has(usedOption)) {
+            for (OptionSpec<?> arg : invalidOptions) {
+                if (options.has(arg)) {
+                    printUsageAndExit(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+                }
+            }
+        }
+    }
+
+    /**
+     * Check that none of the listed options are present.
+     */
+    public static void checkInvalidArgs(OptionParser parser,
+                                        OptionSet options,
+                                        OptionSpec<?> usedOption,
+                                        Set<OptionSpec<?>> invalidOptions) {
+        OptionSpec<?>[] array = new OptionSpec<?>[invalidOptions.size()];
+        invalidOptions.toArray(array);
+        checkInvalidArgs(parser, options, usedOption, array);
+    }
+
+    /**
+     * Check that none of the listed options are present with the combination 
of used options.
+     */
+    public static void checkInvalidArgsSet(OptionParser parser,
+                                           OptionSet options,
+                                           Set<OptionSpec<?>> usedOptions,
+                                           Set<OptionSpec<?>> invalidOptions,
+                                           Optional<String> 
trailingAdditionalMessage) {
+        if (usedOptions.stream().filter(options::has).count() == 
usedOptions.size()) {
+            for (OptionSpec<?> arg : invalidOptions) {
+                if (options.has(arg)) {
+                    printUsageAndExit(parser, String.format("Option 
combination \"%s\" can't be used with option \"%s\"%s",
+                            usedOptions, arg, 
trailingAdditionalMessage.orElse("")));
+                }
+            }
+        }
+    }
+
+    public static void printUsageAndExit(OptionParser parser, String message) {
+        System.err.println(message);
+        try {
+            parser.printHelpOn(System.err);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        Exit.exit(1, message);
+    }
+
+    public static void printVersionAndExit() {
+        System.out.println(AppInfoParser.getVersion());
+        Exit.exit(0);
+    }
+
+    /**
+     * Parse key-value pairs in the form key=value.
+     * Value may contain equals sign.
+     */
+    public static Properties parseKeyValueArgs(List<String> args) {
+        return parseKeyValueArgs(args, true);
+    }
+
+    /**
+     * Parse key-value pairs in the form key=value.
+     * Value may contain equals sign.
+     */
+    public static Properties parseKeyValueArgs(List<String> args, boolean 
acceptMissingValue) {
+        Properties props = new Properties();
+        List<String[]> splits = new ArrayList<>();
+        args.forEach(arg -> {
+            String[] split = arg.split("=", 2);
+            if (split.length > 0) {
+                splits.add(split);
+            }
+        });
+        splits.forEach(split -> {
+            if (split.length == 1 || (split.length == 2 && (split[1] == null 
|| split[1].isEmpty()))) {
+                if (acceptMissingValue) {
+                    props.put(split[0], "");
+                } else {
+                    throw new IllegalArgumentException(String.format("Missing 
value for key %s}", split[0]));
+                }
+            } else {
+                props.put(split[0], split[1]);
+            }
+        });
+        return props;
+    }
+
+    /**
+     * Merge the options into {@code props} for key {@code key}, with the 
following precedence, from high to low:
+     * 1) if {@code spec} is specified on {@code options} explicitly, use the 
value;
+     * 2) if {@code props} already has {@code key} set, keep it;
+     * 3) otherwise, use the default value of {@code spec}.
+     * A {@code null} value means to remove {@code key} from the {@code props}.
+     */
+    public static <T> void maybeMergeOptions(Properties props, String key, 
OptionSet options, OptionSpec<T> spec) {
+        if (options.has(spec) || !props.containsKey(key)) {
+            T value = options.valueOf(spec);
+            if (value == null) {
+                props.remove(key);
+            } else {
+                props.put(key, value.toString());
+            }
+        }
+    }
+}
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
new file mode 100644
index 00000000000..4d12122e3ae
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CommandLineUtilsTest {
+    @Test
+    public void testParseEmptyArg() {
+        List<String> argArray = Arrays.asList("my.empty.property=");
+
+        assertThrows(IllegalArgumentException.class, () -> 
CommandLineUtils.parseKeyValueArgs(argArray, false));
+    }
+
+    @Test
+    public void testParseEmptyArgWithNoDelimiter() {
+        List<String> argArray = Arrays.asList("my.empty.property");
+
+        assertThrows(IllegalArgumentException.class, () -> 
CommandLineUtils.parseKeyValueArgs(argArray, false));
+    }
+
+    @Test
+    public void testParseEmptyArgAsValid() {
+        List<String> argArray = Arrays.asList("my.empty.property=", 
"my.empty.property1");
+        Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+        assertEquals(props.getProperty("my.empty.property"), "", "Value of a 
key with missing value should be an empty string");
+        assertEquals(props.getProperty("my.empty.property1"), "", "Value of a 
key with missing value with no delimiter should be an empty string");
+    }
+
+    @Test
+    public void testParseSingleArg() {
+        List<String> argArray = Arrays.asList("my.property=value");
+        Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+        assertEquals(props.getProperty("my.property"), "value", "Value of a 
single property should be 'value'");
+    }
+
+    @Test
+    public void testParseArgs() {
+        List<String> argArray = Arrays.asList("first.property=first", 
"second.property=second");
+        Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+        assertEquals(props.getProperty("first.property"), "first", "Value of 
first property should be 'first'");
+        assertEquals(props.getProperty("second.property"), "second", "Value of 
second property should be 'second'");
+    }
+
+    @Test
+    public void testParseArgsWithMultipleDelimiters() {
+        List<String> argArray = Arrays.asList("first.property==first", 
"second.property=second=", "third.property=thi=rd");
+        Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
+
+        assertEquals(props.getProperty("first.property"), "=first", "Value of 
first property should be '=first'");
+        assertEquals(props.getProperty("second.property"), "second=", "Value 
of second property should be 'second='");
+        assertEquals(props.getProperty("third.property"), "thi=rd", "Value of 
second property should be 'thi=rd'");
+    }
+
+    Properties props = new Properties();
+    OptionParser parser = new OptionParser(false);
+    OptionSpec<String> stringOpt;
+    OptionSpec<Integer> intOpt;
+    OptionSpec<String> stringOptOptionalArg;
+    OptionSpec<Integer> intOptOptionalArg;
+    OptionSpec<String> stringOptOptionalArgNoDefault;
+    OptionSpec<Integer> intOptOptionalArgNoDefault;
+
+    private void setUpOptions() {
+        stringOpt = parser.accepts("str")
+                .withRequiredArg()
+                .ofType(String.class)
+                .defaultsTo("default-string");
+        intOpt = parser.accepts("int")
+                .withRequiredArg()
+                .ofType(Integer.class)
+                .defaultsTo(100);
+        stringOptOptionalArg = parser.accepts("str-opt")
+                .withOptionalArg()
+                .ofType(String.class)
+                .defaultsTo("default-string-2");
+        intOptOptionalArg = parser.accepts("int-opt")
+                .withOptionalArg()
+                .ofType(Integer.class)
+                .defaultsTo(200);
+        stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
+                .withOptionalArg()
+                .ofType(String.class);
+        intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
+                .withOptionalArg()
+                .ofType(Integer.class);
+    }
+
+    @Test
+    public void testMaybeMergeOptionsOverwriteExisting() {
+        setUpOptions();
+
+        props.put("skey", "existing-string");
+        props.put("ikey", "300");
+        props.put("sokey", "existing-string-2");
+        props.put("iokey", "400");
+        props.put("sondkey", "existing-string-3");
+        props.put("iondkey", "500");
+
+        OptionSet options = parser.parse(
+                "--str", "some-string",
+                "--int", "600",
+                "--str-opt", "some-string-2",
+                "--int-opt", "700",
+                "--str-opt-nodef", "some-string-3",
+                "--int-opt-nodef", "800"
+        );
+
+        CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+        CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+        CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault);
+        CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault);
+
+        assertEquals("some-string", props.get("skey"));
+        assertEquals("600", props.get("ikey"));
+        assertEquals("some-string-2", props.get("sokey"));
+        assertEquals("700", props.get("iokey"));
+        assertEquals("some-string-3", props.get("sondkey"));
+        assertEquals("800", props.get("iondkey"));
+    }
+
+    @Test
+    public void testMaybeMergeOptionsDefaultOverwriteExisting() {
+        setUpOptions();
+
+        props.put("sokey", "existing-string");
+        props.put("iokey", "300");
+        props.put("sondkey", "existing-string-2");
+        props.put("iondkey", "400");
+
+        OptionSet options = parser.parse(
+                "--str-opt",
+                "--int-opt",
+                "--str-opt-nodef",
+                "--int-opt-nodef"
+        );
+
+        CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault);
+        CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault);
+
+        assertEquals("default-string-2", props.get("sokey"));
+        assertEquals("200", props.get("iokey"));
+        assertNull(props.get("sondkey"));
+        assertNull(props.get("iondkey"));
+    }
+
+    @Test
+    public void testMaybeMergeOptionsDefaultValueIfNotExist() {
+        setUpOptions();
+
+        OptionSet options = parser.parse();
+
+        CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+        CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+        CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault);
+        CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault);
+
+        assertEquals("default-string", props.get("skey"));
+        assertEquals("100", props.get("ikey"));
+        assertEquals("default-string-2", props.get("sokey"));
+        assertEquals("200", props.get("iokey"));
+        assertNull(props.get("sondkey"));
+        assertNull(props.get("iondkey"));
+    }
+
+    @Test
+    public void testMaybeMergeOptionsNotOverwriteExisting() {
+        setUpOptions();
+
+        props.put("skey", "existing-string");
+        props.put("ikey", "300");
+        props.put("sokey", "existing-string-2");
+        props.put("iokey", "400");
+        props.put("sondkey", "existing-string-3");
+        props.put("iondkey", "500");
+
+        OptionSet options = parser.parse();
+
+        CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
+        CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
+        CommandLineUtils.maybeMergeOptions(props, "sokey", options, 
stringOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "iokey", options, 
intOptOptionalArg);
+        CommandLineUtils.maybeMergeOptions(props, "sondkey", options, 
stringOptOptionalArgNoDefault);
+        CommandLineUtils.maybeMergeOptions(props, "iondkey", options, 
intOptOptionalArgNoDefault);
+
+        assertEquals("existing-string", props.get("skey"));
+        assertEquals("300", props.get("ikey"));
+        assertEquals("existing-string-2", props.get("sokey"));
+        assertEquals("400", props.get("iokey"));
+        assertEquals("existing-string-3", props.get("sondkey"));
+        assertEquals("500", props.get("iondkey"));
+    }
+}

Reply via email to