Repository: storm Updated Branches: refs/heads/master 2832aeb36 -> 80846773d
STORM-2226: Fix kafka spout offset lag ui for kerberized kafka Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4cec3bdd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4cec3bdd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4cec3bdd Branch: refs/heads/master Commit: 4cec3bdd073572fc41f9724c582baa3f769edddb Parents: 2832aeb Author: Priyank <ps...@hortonworks.com> Authored: Tue Nov 29 20:56:01 2016 -0800 Committer: Satish Duggana <sdugg...@hortonworks.com> Committed: Fri Dec 2 11:45:41 2016 +0530 ---------------------------------------------------------------------- bin/storm-kafka-monitor | 13 +++++++++++-- .../org/apache/storm/kafka/spout/KafkaSpout.java | 1 + .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 15 +++++++++++---- .../kafka/monitor/NewKafkaSpoutOffsetQuery.java | 7 ++++++- .../jvm/org/apache/storm/utils/TopologySpoutLag.java | 5 +++++ 5 files changed, 34 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4cec3bdd/bin/storm-kafka-monitor ---------------------------------------------------------------------- diff --git a/bin/storm-kafka-monitor b/bin/storm-kafka-monitor index bfe50b7..a51052d 100755 --- a/bin/storm-kafka-monitor +++ b/bin/storm-kafka-monitor @@ -32,12 +32,21 @@ done STORM_BIN_DIR=`dirname ${PRG}` export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd` +export STORM_CONF_DIR="${STORM_CONF_DIR:-$STORM_BASE_DIR/conf}" +if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then + . "${STORM_CONF_DIR}/storm-env.sh" +fi + +STORM_JAAS_CONF_PARAM="" +JAAS_FILE="${STORM_CONF_DIR}/storm_jaas.conf" +if [ -f $JAAS_FILE ]; then + STORM_JAAS_CONF_PARAM="-Djava.security.auth.login.config=${JAAS_FILE}" +fi # Which java to use if [ -z "$JAVA_HOME" ]; then JAVA="java" else JAVA="$JAVA_HOME/bin/java" fi - -exec "$JAVA" -cp "$STORM_BASE_DIR/toollib/*" org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@" +exec $JAVA $STORM_JAAS_CONF_PARAM -cp $STORM_BASE_DIR/toollib/storm-kafka-monitor-*.jar org.apache.storm.kafka.monitor.KafkaOffsetLagUtil "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4cec3bdd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 439492b..6528ae9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -429,6 +429,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); + configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol")); return configuration; } http://git-wip-us.apache.org/repos/asf/storm/blob/4cec3bdd/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java index f1bb6b5..aa1a728 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java @@ -69,6 +69,8 @@ public class KafkaOffsetLagUtil { private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node"; private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r"; private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node"; + private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s"; + private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol"; public static void main (String args[]) { try { @@ -81,9 +83,9 @@ public class KafkaOffsetLagUtil { } if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) { OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery; - if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) { - printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " is not accepted with option " + - OPTION_OLD_CONSUMER_LONG); + if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) { + printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " or " + OPTION_SECURITY_PROTOCOL_LONG + " is " + + "not accepted with option " + OPTION_OLD_CONSUMER_LONG); } if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) { printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " + @@ -121,6 +123,7 @@ public class KafkaOffsetLagUtil { } results = getOffsetLags(oldKafkaSpoutOffsetQuery); } else { + String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG); String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG, OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG}; for (String oldOption: oldSpoutOptions) { @@ -133,7 +136,7 @@ public class KafkaOffsetLagUtil { " is not specified"); } NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), - commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG)); + commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); results = getOffsetLags(newKafkaSpoutOffsetQuery); } @@ -191,6 +194,7 @@ public class KafkaOffsetLagUtil { " offsets without the topic and partition nodes"); options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " + "/brokers (applicable only for old kafka spout) "); + options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka"); return options; } @@ -210,6 +214,9 @@ public class KafkaOffsetLagUtil { props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) { + props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol()); + } List<TopicPartition> topicPartitionList = new ArrayList<>(); consumer = new KafkaConsumer<>(props); for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) { http://git-wip-us.apache.org/repos/asf/storm/blob/4cec3bdd/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java index 8e7354f..18bc0df 100644 --- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java +++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java @@ -24,11 +24,13 @@ public class NewKafkaSpoutOffsetQuery { private final String topics; // comma separated list of topics private final String consumerGroupId; // consumer group id for which the offset needs to be calculated private final String bootStrapBrokers; // bootstrap brokers + private final String securityProtocol; // security protocol to connect to kafka - public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId) { + public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol) { this.topics = topics; this.bootStrapBrokers = bootstrapBrokers; this.consumerGroupId = consumerGroupId; + this.securityProtocol = securityProtocol; } public String getTopics() { @@ -43,6 +45,9 @@ public class NewKafkaSpoutOffsetQuery { return this.consumerGroupId; } + public String getSecurityProtocol() { + return this.securityProtocol; + } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/storm/blob/4cec3bdd/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java index c9aa37b..bb327ee 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java @@ -91,6 +91,11 @@ public class TopologySpoutLag { commands.add((String)jsonConf.get(configKeyPrefix + "groupid")); commands.add("-b"); commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers")); + String securityProtocol = (String)jsonConf.get(configKeyPrefix + "security.protocol"); + if (securityProtocol != null && !securityProtocol.isEmpty()) { + commands.add("-s"); + commands.add(securityProtocol); + } return commands; }