Repository: storm
Updated Branches:
  refs/heads/1.x-branch 81ad2d5bb -> df6f96ab8


STORM-2226: Fix kafka spout offset lag ui for kerberized kafka


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab9823b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab9823b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab9823b2

Branch: refs/heads/1.x-branch
Commit: ab9823b202443c659e6bf5cdfd113fa4961f9bc8
Parents: 81ad2d5
Author: Priyank <ps...@hortonworks.com>
Authored: Tue Nov 29 20:56:01 2016 -0800
Committer: Satish Duggana <sdugg...@hortonworks.com>
Committed: Fri Dec 2 12:00:26 2016 +0530

----------------------------------------------------------------------
 bin/storm-kafka-monitor                              | 13 +++++++++++--
 .../org/apache/storm/kafka/spout/KafkaSpout.java     |  1 +
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java      | 15 +++++++++++----
 .../kafka/monitor/NewKafkaSpoutOffsetQuery.java      |  7 ++++++-
 .../jvm/org/apache/storm/utils/TopologySpoutLag.java |  5 +++++
 5 files changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ab9823b2/bin/storm-kafka-monitor
----------------------------------------------------------------------
diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor
index bfe50b7..a51052d 100755
--- a/bin/storm-kafka-monitor
+++ b/bin/storm-kafka-monitor
@@ -32,12 +32,21 @@ done
 
 STORM_BIN_DIR=`dirname ${PRG}`
 export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd`
+export STORM_CONF_DIR="${STORM_CONF_DIR:-$STORM_BASE_DIR/conf}"
 
+if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then
+  . "${STORM_CONF_DIR}/storm-env.sh"
+fi
+
+STORM_JAAS_CONF_PARAM=""
+JAAS_FILE="${STORM_CONF_DIR}/storm_jaas.conf"
+if [ -f $JAAS_FILE ]; then
+  STORM_JAAS_CONF_PARAM="-Djava.security.auth.login.config=${JAAS_FILE}"
+fi
 # Which java to use
 if [ -z "$JAVA_HOME" ]; then
   JAVA="java"
 else
   JAVA="$JAVA_HOME/bin/java"
 fi
-
-exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/*" 
org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
+exec $JAVA $STORM_JAAS_CONF_PARAM -cp 
$STORM_BASE_DIR/toollib/storm-kafka-monitor-*.jar 
org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/ab9823b2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 6b9e1b0..e37d549 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -424,6 +424,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         configuration.put(configKeyPrefix + "groupid", 
kafkaSpoutConfig.getConsumerGroupId());
         configuration.put(configKeyPrefix + "bootstrap.servers", 
kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
+        configuration.put(configKeyPrefix + "security.protocol", 
kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
         return configuration;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ab9823b2/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 0c9e873..fa80da0 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -69,6 +69,8 @@ public class KafkaOffsetLagUtil {
     private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
     private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
     private static final String OPTION_ZK_BROKERS_ROOT_LONG = 
"zk-brokers-root-node";
+    private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
+    private static final String OPTION_SECURITY_PROTOCOL_LONG = 
"security-protocol";
 
     public static void main (String args[]) {
         try {
@@ -81,9 +83,9 @@ public class KafkaOffsetLagUtil {
             }
             if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
                 OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
-                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
-                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + 
OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " +
-                            OPTION_OLD_CONSUMER_LONG);
+                if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || 
commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) {
+                    printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + 
OPTION_BOOTSTRAP_BROKERS_LONG + " or " + OPTION_SECURITY_PROTOCOL_LONG + " is " 
+
+                            "not accepted with option " + 
OPTION_OLD_CONSUMER_LONG);
                 }
                 if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || 
!commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
                     printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and 
" + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
@@ -121,6 +123,7 @@ public class KafkaOffsetLagUtil {
                 }
                 results = getOffsetLags(oldKafkaSpoutOffsetQuery);
             } else {
+                String securityProtocol = 
commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
                 String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, 
OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
                         OPTION_ZK_COMMITTED_NODE_LONG, 
OPTION_ZK_BROKERS_ROOT_LONG};
                 for (String oldOption: oldSpoutOptions) {
@@ -133,7 +136,7 @@ public class KafkaOffsetLagUtil {
                             " is not specified");
                 }
                 NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
-                        
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG));
+                        
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
                 results = getOffsetLags(newKafkaSpoutOffsetQuery);
             }
 
@@ -191,6 +194,7 @@ public class KafkaOffsetLagUtil {
                 " offsets without the topic and partition nodes");
         options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, 
OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker 
information e.g. " +
                 "/brokers (applicable only for old kafka spout) ");
+        options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, 
OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
         return options;
     }
 
@@ -210,6 +214,9 @@ public class KafkaOffsetLagUtil {
             props.put("session.timeout.ms", "30000");
             props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+            if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
+                props.put("security.protocol", 
newKafkaSpoutOffsetQuery.getSecurityProtocol());
+            }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);
             for (String topic: 
newKafkaSpoutOffsetQuery.getTopics().split(",")) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ab9823b2/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 8e7354f..18bc0df 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -24,11 +24,13 @@ public class NewKafkaSpoutOffsetQuery {
     private final String topics; // comma separated list of topics
     private final String consumerGroupId; // consumer group id for which the 
offset needs to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
+    private final String securityProtocol; // security protocol to connect to 
kafka
 
-    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, 
String consumerGroupId) {
+    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, 
String consumerGroupId, String securityProtocol) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
+        this.securityProtocol = securityProtocol;
     }
 
     public String getTopics() {
@@ -43,6 +45,9 @@ public class NewKafkaSpoutOffsetQuery {
         return this.consumerGroupId;
     }
 
+    public String getSecurityProtocol() {
+        return this.securityProtocol;
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/storm/blob/ab9823b2/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java 
b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index c9aa37b..bb327ee 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -91,6 +91,11 @@ public class TopologySpoutLag {
         commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
         commands.add("-b");
         commands.add((String)jsonConf.get(configKeyPrefix + 
"bootstrap.servers"));
+        String securityProtocol = (String)jsonConf.get(configKeyPrefix + 
"security.protocol");
+        if (securityProtocol != null && !securityProtocol.isEmpty()) {
+            commands.add("-s");
+            commands.add(securityProtocol);
+        }
         return commands;
     }
 

Reply via email to