This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new be43e23  KAFKA-7147; ReassignPartitionsCommand should be able to 
connect to broker over SSL
be43e23 is described below

commit be43e2330e85d56d6aebf756b83b5ad43d0d4da4
Author: Dong Lin <[email protected]>
AuthorDate: Fri Aug 10 14:24:27 2018 -0700

    KAFKA-7147; ReassignPartitionsCommand should be able to connect to broker 
over SSL
    
    Author: Dong Lin <[email protected]>
    
    Reviewers: Andras Beni <[email protected]>, Manikumar Reddy O 
<[email protected]>, Sriharsha Chintalapani <[email protected]>
    
    Closes #5355 from lindong28/KAFKA-7147
---
 core/src/main/scala/kafka/admin/LogDirsCommand.scala         | 12 ++++++++++--
 .../main/scala/kafka/admin/ReassignPartitionsCommand.scala   | 11 +++++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala 
b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
index 9257942..b51f25d 100644
--- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala
+++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
 import scala.collection.Map
 import kafka.utils.{CommandLineUtils, Json}
 import joptsimple._
+import org.apache.kafka.common.utils.Utils
 
 /**
   * A command for querying log directory usage on the specified brokers
@@ -83,9 +84,12 @@ object LogDirsCommand {
     }
 
     private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = 
{
-        val props = new Properties()
+        val props = if (opts.options.has(opts.commandConfigOpt))
+            Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+        else
+            new Properties()
         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
+        props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool")
         JAdminClient.create(props)
     }
 
@@ -95,6 +99,10 @@ object LogDirsCommand {
           .withRequiredArg
           .describedAs("The server(s) to use for bootstrapping")
           .ofType(classOf[String])
+        val commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+          .withRequiredArg
+          .describedAs("Admin client property file")
+          .ofType(classOf[String])
         val describeOpt = parser.accepts("describe", "Describe the specified 
log directories on the specified brokers.")
         val topicListOpt = parser.accepts("topic-list", "The list of topics to 
be queried in the form \"topic1,topic2,topic3\". " +
           "All topics will be queried if no topic list is specified")
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 041375a..61c9643 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -72,9 +72,12 @@ object ReassignPartitionsCommand extends Logging {
 
   private def createAdminClient(opts: ReassignPartitionsCommandOptions): 
Option[JAdminClient] = {
     if (opts.options.has(opts.bootstrapServerOpt)) {
-      val props = new Properties()
+      val props = if (opts.options.has(opts.commandConfigOpt))
+        Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+      else
+        new Properties()
       props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt))
-      props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool")
+      props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, 
"reassign-partitions-tool")
       Some(JAdminClient.create(props))
     } else {
       None
@@ -448,6 +451,10 @@ object ReassignPartitionsCommand extends Logging {
                       .withRequiredArg
                       .describedAs("Server(s) to use for bootstrapping")
                       .ofType(classOf[String])
+    val commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+                      .withRequiredArg
+                      .describedAs("Admin client property file")
+                      .ofType(classOf[String])
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection 
string for the zookeeper connection in the " +
                       "form host:port. Multiple URLS can be given to allow 
fail-over.")
                       .withRequiredArg

Reply via email to