This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new be43e23 KAFKA-7147; ReassignPartitionsCommand should be able to
connect to broker over SSL
be43e23 is described below
commit be43e2330e85d56d6aebf756b83b5ad43d0d4da4
Author: Dong Lin <[email protected]>
AuthorDate: Fri Aug 10 14:24:27 2018 -0700
KAFKA-7147; ReassignPartitionsCommand should be able to connect to broker
over SSL
Author: Dong Lin <[email protected]>
Reviewers: Andras Beni <[email protected]>, Manikumar Reddy O
<[email protected]>, Sriharsha Chintalapani <[email protected]>
Closes #5355 from lindong28/KAFKA-7147
---
core/src/main/scala/kafka/admin/LogDirsCommand.scala | 12 ++++++++++--
.../main/scala/kafka/admin/ReassignPartitionsCommand.scala | 11 +++++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index 9257942..b51f25d 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import scala.collection.Map
import kafka.utils.{CommandLineUtils, Json}
import joptsimple._
+import org.apache.kafka.common.utils.Utils
/**
* A command for querying log directory usage on the specified brokers
@@ -83,9 +84,12 @@ object LogDirsCommand {
}
private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient =
{
- val props = new Properties()
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
- props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
+ props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
JAdminClient.create(props)
}
@@ -95,6 +99,10 @@ object LogDirsCommand {
.withRequiredArg
.describedAs("The server(s) to use for bootstrapping")
.ofType(classOf[String])
+ val commandConfigOpt = parser.accepts("command-config", "Property file
containing configs to be passed to Admin Client.")
+ .withRequiredArg
+ .describedAs("Admin client property file")
+ .ofType(classOf[String])
val describeOpt = parser.accepts("describe", "Describe the specified
log directories on the specified brokers.")
val topicListOpt = parser.accepts("topic-list", "The list of topics to
be queried in the form \"topic1,topic2,topic3\". " +
"All topics will be queried if no topic list is specified")
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 041375a..61c9643 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -72,9 +72,12 @@ object ReassignPartitionsCommand extends Logging {
private def createAdminClient(opts: ReassignPartitionsCommandOptions):
Option[JAdminClient] = {
if (opts.options.has(opts.bootstrapServerOpt)) {
- val props = new Properties()
+ val props = if (opts.options.has(opts.commandConfigOpt))
+ Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+ else
+ new Properties()
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
opts.options.valueOf(opts.bootstrapServerOpt))
- props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
+ props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG,
"reassign-partitions-tool")
Some(JAdminClient.create(props))
} else {
None
@@ -448,6 +451,10 @@ object ReassignPartitionsCommand extends Logging {
.withRequiredArg
.describedAs("Server(s) to use for bootstrapping")
.ofType(classOf[String])
+ val commandConfigOpt = parser.accepts("command-config", "Property file
containing configs to be passed to Admin Client.")
+ .withRequiredArg
+ .describedAs("Admin client property file")
+ .ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection
string for the zookeeper connection in the " +
"form host:port. Multiple URLS can be given to allow
fail-over.")
.withRequiredArg