Repository: incubator-eagle Updated Branches: refs/heads/develop cc1261553 -> 502c7e37f
EAGLE-438 Multiple Inputs for Hadoop JMX Collector Python Script make jmx collection applicable to all hadoop components https://issues.apache.org/jira/browse/EAGLE-438 Author: @peterkim95 <Peter Kim> Reviewer: Ralph Su Closes: #316 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/502c7e37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/502c7e37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/502c7e37 Branch: refs/heads/develop Commit: 502c7e37f842eabc4021c47053dd95bdd1c7efde Parents: cc12615 Author: yonzhang <[email protected]> Authored: Fri Aug 12 14:54:52 2016 -0700 Committer: yonzhang <[email protected]> Committed: Fri Aug 12 14:54:52 2016 -0700 ---------------------------------------------------------------------- eagle-external/hadoop_jmx_collector/README.md | 75 +++++++++++++------- .../hadoop_jmx_collector/config-sample.json | 33 ++++++--- .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 23 +++--- .../hadoop_jmx_collector/metric_collector.py | 37 +++++----- 4 files changed, 106 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/README.md ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/README.md b/eagle-external/hadoop_jmx_collector/README.md index cd8d887..bd89600 100644 --- a/eagle-external/hadoop_jmx_collector/README.md +++ b/eagle-external/hadoop_jmx_collector/README.md @@ -18,58 +18,81 @@ limitations under the License. --> -# Hadoop Jmx Collector +# Hadoop JMX Collector to Kafka -These scripts help to collect Hadoop jmx and evently sent the metrics to stdout or Kafka. Tested with Python 2.7. +Python script to collect JMX metrics for any Hadoop component, and send it to Kafka topic ### How to use it - 1. Edit the configuration file (json file). For example: - + 1. Edit the configuration file config.json. For example: + ``` { "env": { "site": "sandbox" }, - "input": { - "component": "namenode", - "port": "50070", - "https": false - }, + "inputs": [ + { + "component": "namenode", + "host": "127.0.0.1", + "port": "50070", + "https": false, + "kafka_topic": "nn_jmx_metric_sandbox" + }, + { + "component": "resourcemanager", + "host": "127.0.0.1", + "port": "8088", + "https": false, + "kafka_topic": "rm_jmx_metric_sandbox" + }, + { + "component": "datanode", + "host": "127.0.0.1", + "port": "50075", + "https": false, + "kafka_topic": "dn_jmx_metric_sandbox" + } + ], "filter": { "monitoring.group.selected": ["hadoop", "java.lang"] }, "output": { } } - + ``` 2. Run the scripts - - # for general use - python hadoop_jmx_kafka.py > 1.txt + ``` + python hadoop_jmx_kafka.py + ``` + +### Editing config.json -### Edit `eagle-collector.conf` +* inputs -* input + "port" defines the hadoop service port, such as 50070 => "namenode", 16010 => "hbasemaster". + Like the example above, you can specify multiple hadoop components to collect - "port" defines the hadoop service port, such as 50070 => "namenode", 60010 => "hbase master". + "https" is whether or not you want to use SSL protocol in your connection to the host+port + + "kafka_topic" is the kafka topic that you want to populate with the jmx data from the respective component * filter "monitoring.group.selected" can filter out beans which we care about. -* output - - if we left it empty, then the output is stdout by default. - - "output": {} - - It also supports Kafka as its output. +* output + You can specify the kafka broker list + ``` "output": { "kafka": { - "topic": "test_topic", - "brokerList": [ "sandbox.hortonworks.com:6667"] + "brokerList": [ "localhost:9092"] } } - + ``` + + To check that the a desired kafka topic is being populated: + ``` + kafka-console-consumer --zookeeper localhost:2181 --topic nn_jmx_metric_sandbox + ``` http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json index 57c9aae..488b8e0 100644 --- a/eagle-external/hadoop_jmx_collector/config-sample.json +++ b/eagle-external/hadoop_jmx_collector/config-sample.json @@ -12,18 +12,35 @@ "https":false } }, - "input": { - "component": "namenode", - "port": "50070", - "https": false - }, + "inputs": [ + { + "component": "namenode", + "host": "server.eagle.apache.org", + "port": "50070", + "https": false, + "kafka_topic": "nn_jmx_metric_sandbox" + }, + { + "component": "resourcemanager", + "host": "server.eagle.apache.org", + "port": "8088", + "https": false, + "kafka_topic": "rm_jmx_metric_sandbox" + }, + { + "component": "datanode", + "host": "server.eagle.apache.org", + "port": "50075", + "https": false, + "kafka_topic": "dn_jmx_metric_sandbox" + } + ], "filter": { "monitoring.group.selected": ["hadoop", "java.lang"] }, "output": { "kafka": { - "topic": "nn_jmx_metric_sandbox", - "brokerList": ["sandbox.hortonworks.com:6667"] + "brokerList": ["localhost:9092"] } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index 286d0be..6eca7d9 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -16,8 +16,11 @@ # limitations under the License. # -from metric_collector import JmxMetricCollector,JmxMetricListener,Runner +from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricCollector,Helper import json +import logging + +logging.basicConfig(level=logging.INFO,format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',datefmt='%m-%d %H:%M') class NNSafeModeMetric(JmxMetricListener): def on_metric(self, metric): @@ -47,8 +50,7 @@ class MemortUsageMetric(JmxMetricListener): if bean["name"] == "Hadoop:service=NameNode,name=JvmMetrics": memnonheapusedusage = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0, 2) self.collector.on_bean_kv(self.PREFIX, "memnonheapusedusage", memnonheapusedusage) - memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100, - 2) + memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100, 2) self.collector.on_bean_kv(self.PREFIX, "memnonheapcommittedusage", memnonheapcommittedusage) memheapusedusage = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2) self.collector.on_bean_kv(self.PREFIX, "memheapusedusage", memheapusedusage) @@ -77,12 +79,9 @@ class JournalTransactionInfoMetric(JmxMetricListener): if __name__ == '__main__': - collector = JmxMetricCollector() - collector.register( - NNSafeModeMetric(), - NNHAMetric(), - MemortUsageMetric(), - JournalTransactionInfoMetric(), - NNCapacityUsageMetric() - ) - Runner.run(collector) + config = Helper.load_config() + + for ip in config['inputs']: + collector = JmxMetricCollector(ip['component'], ip['host'], ip['port'], ip['https'], ip['kafka_topic']) + collector.register(NNSafeModeMetric(), NNHAMetric(), MemortUsageMetric(), JournalTransactionInfoMetric(), NNCapacityUsageMetric()) + Runner.run(collector) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/502c7e37/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index 939b811..79320cc 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -186,7 +186,7 @@ class KafkaMetricSender(MetricSender): super(KafkaMetricSender, self).__init__(config) kafka_config = config["output"]["kafka"] # default topic - self.topic = kafka_config["topic"].encode('utf-8') + # self.topic = kafka_config["topic"].encode('utf-8') # producer self.broker_list = kafka_config["brokerList"] self.kafka_client = None @@ -197,8 +197,8 @@ class KafkaMetricSender(MetricSender): self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500, batch_send_every_t=30) - def send(self, msg): - self.kafka_producer.send_messages(self.topic, json.dumps(msg)) + def send(self, msg, topic): + self.kafka_producer.send_messages(topic, json.dumps(msg)) def close(self): if self.kafka_producer is not None: @@ -208,14 +208,21 @@ class KafkaMetricSender(MetricSender): class MetricCollector(threading.Thread): - def __init__(self): + def __init__(self, comp, host, port, https, topic): threading.Thread.__init__(self) + + self.comp = comp + self.host = host + self.port = port + self.https = https + self.topic = topic + self.config = Helper.load_config() self.sender = KafkaMetricSender(self.config) self.fqdn = socket.getfqdn() - self.init(self.config) + self.init(self.config, self.comp, self.host, self.port, self.https, self.topic) - def init(self, config): + def init(self, config, comp, host, port, https, topic): pass def start(self): @@ -235,7 +242,7 @@ class MetricCollector(threading.Thread): if not msg.has_key("site"): msg["site"] = self.config["env"]["site"] - self.sender.send(msg) + self.sender.send(msg, self.topic) def run(self): raise Exception("`run` method should be overrode by sub-class before being called") @@ -260,14 +267,12 @@ class JmxMetricCollector(MetricCollector): port = None listeners = [] - def init(self, config): - if config["input"].has_key("host"): - self.host = config["input"]["host"] - else: - self.host = self.fqdn - self.port = config["input"]["port"] - self.https = config["input"]["https"] - self.component = config["input"]["component"] + def init(self, config, comp, host, port, https, topic): + self.host = host + self.port = port + self.https = https + self.component = comp + self.topic = topic self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')] self.listeners = [] @@ -351,4 +356,4 @@ class JmxMetricListener: pass def on_metric(self, metric): - pass \ No newline at end of file + pass
