Repository: incubator-eagle Updated Branches: refs/heads/master 286034ff3 -> fb853dfd8
[EAGLE-807] Refactor JMX Metric Collector Script Refactor JMX Metric Collector Script * Support multiple kafka topics * Support HA checking logic. Author: Hao Chen <h...@apache.org> Closes #695 from haoch/fixJMXCollectorScript. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fb853dfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fb853dfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fb853dfd Branch: refs/heads/master Commit: fb853dfd8ce722a8ee4ee970f9416b2dbfc79d8c Parents: 286034f Author: Hao Chen <h...@apache.org> Authored: Tue Nov 29 16:32:22 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Tue Nov 29 16:32:22 2016 +0800 ---------------------------------------------------------------------- eagle-external/hadoop_jmx_collector/README.md | 94 ++++++++--------- .../hadoop_jmx_collector/config-sample.json | 57 +++++----- .../hadoop_jmx_collector/hadoop_ha_checker.py | 59 ++++++----- .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 53 +++++----- .../hadoop_jmx_collector/metric_collector.py | 103 +++++++++++-------- 5 files changed, 191 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/README.md ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/README.md b/eagle-external/hadoop_jmx_collector/README.md index bd89600..b11c081 100644 --- a/eagle-external/hadoop_jmx_collector/README.md +++ b/eagle-external/hadoop_jmx_collector/README.md @@ -18,81 +18,75 @@ limitations under the License. --> -# Hadoop JMX Collector to Kafka +# Hadoop Jmx Collector -Python script to collect JMX metrics for any Hadoop component, and send it to Kafka topic +These scripts help to collect Hadoop jmx and evently sent the metrics to stdout or Kafka. Tested with Python 2.7. ### How to use it - 1. Edit the configuration file config.json. For example: - ``` + 1. Edit the configuration file (json file). For example: + { - "env": { - "site": "sandbox" - }, - "inputs": [ + "env": { + "site": "sandbox" + }, + "input": [ { "component": "namenode", - "host": "127.0.0.1", + "host": "sandbox.hortonworks.com", "port": "50070", - "https": false, - "kafka_topic": "nn_jmx_metric_sandbox" + "https": false }, { "component": "resourcemanager", - "host": "127.0.0.1", + "host": "sandbox.hortonworks.com", "port": "8088", - "https": false, - "kafka_topic": "rm_jmx_metric_sandbox" - }, - { - "component": "datanode", - "host": "127.0.0.1", - "port": "50075", - "https": false, - "kafka_topic": "dn_jmx_metric_sandbox" + "https": false + } + ], + "filter": { + "monitoring.group.selected": ["hadoop", "java.lang"] + }, + "output": { + "kafka": { + "default_topic": "nn_jmx_metric_sandbox", + "component_topic_mapping": { + "namenode": "nn_jmx_metric_sandbox", + "resourcemanager": "rm_jmx_metric_sandbox" + }, + "broker_list": [ + "sandbox.hortonworks.com:6667" + ] } - ], - "filter": { - "monitoring.group.selected": ["hadoop", "java.lang"] - }, - "output": { - } + } } - ``` - 2. Run the scripts - - ``` - python hadoop_jmx_kafka.py - ``` -### Editing config.json - -* inputs + 2. Run the scripts + + # for general use + python hadoop_jmx_kafka.py > 1.txt - "port" defines the hadoop service port, such as 50070 => "namenode", 16010 => "hbasemaster". - Like the example above, you can specify multiple hadoop components to collect +### Edit `eagle-collector.conf` - "https" is whether or not you want to use SSL protocol in your connection to the host+port +* input (monitored hosts) - "kafka_topic" is the kafka topic that you want to populate with the jmx data from the respective component + "port" defines the hadoop service port, such as 50070 => "namenode", 60010 => "hbase master". * filter "monitoring.group.selected" can filter out beans which we care about. -* output +* output + + if we left it empty, then the output is stdout by default. + + "output": {} + + It also supports Kafka as its output. - You can specify the kafka broker list - ``` "output": { "kafka": { - "brokerList": [ "localhost:9092"] + "topic": "test_topic", + "broker_list": [ "sandbox.hortonworks.com:6667"] } } - ``` - - To check that the a desired kafka topic is being populated: - ``` - kafka-console-consumer --zookeeper localhost:2181 --topic nn_jmx_metric_sandbox - ``` http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json index 9245f1a..a0670e1 100644 --- a/eagle-external/hadoop_jmx_collector/config-sample.json +++ b/eagle-external/hadoop_jmx_collector/config-sample.json @@ -1,55 +1,46 @@ { "env": { - "site": "sandbox", - "name_node": { - "hosts": [ - "sandbox.hortonworks.com" - ], - "port": 50070, + "site": "sandbox" + }, + "input": [ + { + "component": "namenode", + "host": "sandbox.hortonworks.com", + "port": "50070", "https": false }, - "resource_manager": { - "hosts": [ - "sandbox.hortonworks.com" - ], - "port": 50030, - "https": false - } - }, - "inputs": [ { "component": "namenode", - "host": "server.eagle.apache.org", + "host": "sandbox.hortonworks.com", "port": "50070", - "https": false, - "kafka_topic": "nn_jmx_metric_sandbox" + "https": false }, { "component": "resourcemanager", - "host": "server.eagle.apache.org", + "host": "sandbox.hortonworks.com", "port": "8088", - "https": false, - "kafka_topic": "rm_jmx_metric_sandbox" + "https": false }, { - "component": "datanode", - "host": "server.eagle.apache.org", - "port": "50075", - "https": false, - "kafka_topic": "dn_jmx_metric_sandbox" + "component": "resourcemanager", + "host": "sandbox.hortonworks.com", + "port": "8088", + "https": false } ], "filter": { - "monitoring.group.selected": [ - "hadoop", - "java.lang" - ] + "monitoring.group.selected": ["hadoop", "java.lang"] }, "output": { "kafka": { - "brokerList": [ - "localhost:9092" + "default_topic": "nn_jmx_metric_sandbox", + "component_topic_mapping": { + "namenode": "nn_jmx_metric_sandbox", + "resourcemanager": "rm_jmx_metric_sandbox" + }, + "broker_list": [ + "sandbox.hortonworks.com:6667" ] } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py index c33b327..e8bf4ae 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py @@ -17,18 +17,22 @@ # from metric_collector import MetricCollector, JmxReader, YarnWSReader, Runner -import logging +import logging,socket class HadoopNNHAChecker(MetricCollector): def run(self): - if not self.config["env"].has_key("name_node"): - logging.warn("Do nothing for HadoopNNHAChecker as config of env.name_node not found") + hosts = [] + + for input in self.config["input"]: + if not input.has_key("host"): + input["host"] = socket.getfqdn() + if input.has_key("component") and input["component"] == "namenode": + hosts.append(input) + if not bool(hosts): + logging.warn("non hosts are configured as 'namenode' in 'input' config, exit") return - name_node_config = self.config["env"]["name_node"] - hosts = name_node_config["hosts"] - port = name_node_config["port"] - https = name_node_config["https"] + logging.info("Checking namenode HA: " + str(hosts)) total_count = len(hosts) self.collect({ @@ -43,17 +47,21 @@ class HadoopNNHAChecker(MetricCollector): for host in hosts: try: - bean = JmxReader(host, port, https).open().get_jmx_bean_by_name( + bean = JmxReader(host["host"], host["port"], host["https"]).open().get_jmx_bean_by_name( "Hadoop:service=NameNode,name=FSNamesystem") - logging.debug(host + " is " + bean["tag.HAState"]) - if bean["tag.HAState"] == "active": - active_count += 1 + if not bean: + logging.error("JMX Bean[Hadoop:service=NameNode,name=FSNamesystem] is null from " + host["host"]) + if bean.has_key("tag.HAState"): + logging.debug(str(host) + " is " + bean["tag.HAState"]) + if bean["tag.HAState"] == "active": + active_count += 1 + else: + standby_count += 1 else: - standby_count += 1 + logging.info("'tag.HAState' not found from jmx of " + host["host"] + ":" + host["port"]) except Exception as e: - logging.exception("failed to read jmx from " + host) + logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"]) failed_count += 1 - self.collect({ "component": "namenode", "metric": "hadoop.namenode.hastate.active.count", @@ -72,17 +80,19 @@ class HadoopNNHAChecker(MetricCollector): "value": failed_count }) - class HadoopRMHAChecker(MetricCollector): def run(self): - if not self.config["env"].has_key("resource_manager"): - logging.warn("Do nothing for HadoopRMHAChecker as config of env.resource_manager not found") + hosts = [] + for input in self.config["input"]: + if not input.has_key("host"): + input["host"] = socket.getfqdn() + if input.has_key("component") and input["component"] == "resourcemanager": + hosts.append(input) + if not bool(hosts): + logging.warn("Non hosts are configured as 'resourcemanager' in 'input' config, exit") return - name_node_config = self.config["env"]["resource_manager"] - hosts = name_node_config["hosts"] - port = name_node_config["port"] - https = name_node_config["https"] + logging.info("Checking resource manager HA: " + str(hosts)) total_count = len(hosts) self.collect({ @@ -97,13 +107,16 @@ class HadoopRMHAChecker(MetricCollector): for host in hosts: try: - cluster_info = YarnWSReader(host, port, https).read_cluster_info() + cluster_info = YarnWSReader(host["host"], host["port"], host["https"]).read_cluster_info() + if not cluster_info: + logging.error("Cluster info is null from web service of " + host["host"]) + raise Exception("cluster info is null from " + host["host"]) if cluster_info["clusterInfo"]["haState"] == "ACTIVE": active_count += 1 else: standby_count += 1 except Exception as e: - logging.exception("Failed to read yarn ws from " + host) + logging.error("Failed to read yarn ws from " + str(host)) failed_count += 1 self.collect({ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index 6eca7d9..b42bc81 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -16,11 +16,8 @@ # limitations under the License. # -from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricCollector,Helper +from metric_collector import JmxMetricCollector,JmxMetricListener,Runner import json -import logging - -logging.basicConfig(level=logging.INFO,format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',datefmt='%m-%d %H:%M') class NNSafeModeMetric(JmxMetricListener): def on_metric(self, metric): @@ -35,53 +32,55 @@ class NNSafeModeMetric(JmxMetricListener): class NNHAMetric(JmxMetricListener): PREFIX = "hadoop.namenode.fsnamesystem" - def on_bean(self, bean): + def on_bean(self, component, bean): if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystem": if bean[u"tag.HAState"] == "active": - self.collector.on_bean_kv(self.PREFIX, "hastate", 0) + self.collector.on_bean_kv(self.PREFIX, component, "hastate", 0) else: - self.collector.on_bean_kv(self.PREFIX, "hastate", 1) + self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1) -class MemortUsageMetric(JmxMetricListener): +class MemoryUsageMetric(JmxMetricListener): PREFIX = "hadoop.namenode.jvm" - def on_bean(self, bean): + def on_bean(self, component, bean): if bean["name"] == "Hadoop:service=NameNode,name=JvmMetrics": memnonheapusedusage = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0, 2) - self.collector.on_bean_kv(self.PREFIX, "memnonheapusedusage", memnonheapusedusage) - memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100, 2) - self.collector.on_bean_kv(self.PREFIX, "memnonheapcommittedusage", memnonheapcommittedusage) + self.collector.on_bean_kv(self.PREFIX, component, "memnonheapusedusage", memnonheapusedusage) + memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100, + 2) + self.collector.on_bean_kv(self.PREFIX, component, "memnonheapcommittedusage", memnonheapcommittedusage) memheapusedusage = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2) - self.collector.on_bean_kv(self.PREFIX, "memheapusedusage", memheapusedusage) + self.collector.on_bean_kv(self.PREFIX, component,"memheapusedusage", memheapusedusage) memheapcommittedusage = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100, 2) - self.collector.on_bean_kv(self.PREFIX, "memheapcommittedusage", memheapcommittedusage) - + self.collector.on_bean_kv(self.PREFIX, component, "memheapcommittedusage", memheapcommittedusage) class NNCapacityUsageMetric(JmxMetricListener): PREFIX = "hadoop.namenode.fsnamesystemstate" - def on_bean(self, bean): + def on_bean(self, component, bean): if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystemState": capacityusage = round(float(bean['CapacityUsed']) / float(bean['CapacityTotal']) * 100, 2) - self.collector.on_bean_kv(self.PREFIX, "capacityusage", capacityusage) + self.collector.on_bean_kv(self.PREFIX, component, "capacityusage", capacityusage) class JournalTransactionInfoMetric(JmxMetricListener): PREFIX = "hadoop.namenode.journaltransaction" - def on_bean(self, bean): + def on_bean(self, component, bean): if bean.has_key("JournalTransactionInfo"): JournalTransactionInfo = json.loads(bean.get("JournalTransactionInfo")) LastAppliedOrWrittenTxId = float(JournalTransactionInfo.get("LastAppliedOrWrittenTxId")) MostRecentCheckpointTxId = float(JournalTransactionInfo.get("MostRecentCheckpointTxId")) - self.collector.on_bean_kv(self.PREFIX, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId) - self.collector.on_bean_kv(self.PREFIX, "MostRecentCheckpointTxId", MostRecentCheckpointTxId) - + self.collector.on_bean_kv(self.PREFIX, component, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId) + self.collector.on_bean_kv(self.PREFIX, component, "MostRecentCheckpointTxId", MostRecentCheckpointTxId) if __name__ == '__main__': - config = Helper.load_config() - - for ip in config['inputs']: - collector = JmxMetricCollector(ip['component'], ip['host'], ip['port'], ip['https'], ip['kafka_topic']) - collector.register(NNSafeModeMetric(), NNHAMetric(), MemortUsageMetric(), JournalTransactionInfoMetric(), NNCapacityUsageMetric()) - Runner.run(collector) + collector = JmxMetricCollector() + collector.register( + NNSafeModeMetric(), + NNHAMetric(), + MemoryUsageMetric(), + NNCapacityUsageMetric(), + JournalTransactionInfoMetric() + ) + Runner.run(collector) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index 79320cc..e165ea2 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -40,7 +40,6 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s', datefmt='%m-%d %H:%M') - class Helper: def __init__(self): pass @@ -186,19 +185,40 @@ class KafkaMetricSender(MetricSender): super(KafkaMetricSender, self).__init__(config) kafka_config = config["output"]["kafka"] # default topic - # self.topic = kafka_config["topic"].encode('utf-8') + self.default_topic = None + if kafka_config.has_key("default_topic"): + self.default_topic = kafka_config["default_topic"].encode('utf-8') + self.component_topic_mapping = {} + if kafka_config.has_key("component_topic_mapping"): + self.component_topic_mapping = kafka_config["component_topic_mapping"] + + if not self.default_topic and not bool(self.component_topic_mapping): + raise Exception("both kafka config 'topic' and 'component_topic_mapping' are empty") + # producer - self.broker_list = kafka_config["brokerList"] + self.broker_list = kafka_config["broker_list"] self.kafka_client = None self.kafka_producer = None + def get_topic_id(self, msg): + if msg.has_key("component"): + component = msg["component"] + if self.component_topic_mapping.has_key(component): + return self.component_topic_mapping[component] + else: + return self.default_topic + else: + if not self.default_topic: + raise Exception("no default topic found for unknown-component msg: " + str(msg)) + return self.default_topic + def open(self): self.kafka_client = KafkaClient(self.broker_list, timeout=59) self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500, batch_send_every_t=30) - def send(self, msg, topic): - self.kafka_producer.send_messages(topic, json.dumps(msg)) + def send(self, msg): + self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg)) def close(self): if self.kafka_producer is not None: @@ -206,23 +226,15 @@ class KafkaMetricSender(MetricSender): if self.kafka_client is not None: self.kafka_client.close() - class MetricCollector(threading.Thread): - def __init__(self, comp, host, port, https, topic): + def __init__(self): threading.Thread.__init__(self) - - self.comp = comp - self.host = host - self.port = port - self.https = https - self.topic = topic - self.config = Helper.load_config() self.sender = KafkaMetricSender(self.config) self.fqdn = socket.getfqdn() - self.init(self.config, self.comp, self.host, self.port, self.https, self.topic) + self.init(self.config) - def init(self, config, comp, host, port, https, topic): + def init(self, config): pass def start(self): @@ -242,7 +254,7 @@ class MetricCollector(threading.Thread): if not msg.has_key("site"): msg["site"] = self.config["env"]["site"] - self.sender.send(msg, self.topic) + self.sender.send(msg) def run(self): raise Exception("`run` method should be overrode by sub-class before being called") @@ -258,21 +270,28 @@ class Runner(object): :return: """ for thread in threads: - thread.start() + try: + thread.start() + except Exception as e: + logging.exception(e) class JmxMetricCollector(MetricCollector): selected_domain = None - component = None - https = False - port = None listeners = [] + input_components = [] + + def init(self, config): + self.input_components = config["input"] + for input in self.input_components: + if not input.has_key("host"): + input["host"] = self.fqdn + if not input.has_key("component"): + raise Exception("component not defined in " + str(input)) + if not input.has_key("port"): + raise Exception("port not defined in " + str(input)) + if not input.has_key("https"): + input["https"] = False - def init(self, config, comp, host, port, https, topic): - self.host = host - self.port = port - self.https = https - self.component = comp - self.topic = topic self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')] self.listeners = [] @@ -286,20 +305,21 @@ class JmxMetricCollector(MetricCollector): self.listeners.append(listener) def run(self): - try: - beans = JmxReader(self.host, self.port, self.https).open().get_jmx_beans() - self.on_beans(beans) - except Exception as e: - logging.exception("Failed to read jmx for " + self.host) + for input in self.input_components: + try: + beans = JmxReader(input["host"], input["port"], input["https"]).open().get_jmx_beans() + self.on_beans(input["component"], beans) + except Exception as e: + logging.exception("Failed to read jmx for " + str(input)) def filter_bean(self, bean, mbean_domain): return mbean_domain in self.selected_domain - def on_beans(self, beans): + def on_beans(self, component, beans): for bean in beans: - self.on_bean(bean) + self.on_bean(component,bean) - def on_bean(self, bean): + def on_bean(self, component, bean): # mbean is of the form "domain:key=value,...,foo=bar" mbean = bean[u'name'] mbean_domain, mbean_attribute = mbean.rstrip().split(":", 1) @@ -313,24 +333,23 @@ class JmxMetricCollector(MetricCollector): # print kafka_dict for key, value in bean.iteritems(): - self.on_bean_kv(metric_prefix_name, key, value) + self.on_bean_kv(metric_prefix_name, component,key, value) for listener in self.listeners: - listener.on_bean(bean.copy()) + listener.on_bean(component, bean.copy()) - def on_bean_kv(self, prefix, key, value): + def on_bean_kv(self, prefix,component, key, value): # Skip Tags if re.match(r'tag.*', key): return metric_name = (prefix + '.' + key).lower() self.on_metric({ + "component": component, "metric": metric_name, "value": value }) def on_metric(self, metric): - metric["component"] = self.component - if Helper.is_number(metric["value"]): self.collect(metric) @@ -352,8 +371,8 @@ class JmxMetricListener: def init(self, collector): self.collector = collector - def on_bean(self, bean): + def on_bean(self, component, bean): pass def on_metric(self, metric): - pass + pass \ No newline at end of file