Repository: eagle Updated Branches: refs/heads/master 43bd197d6 -> e8a58b66f
[EAGLE-1023] Update jmx metric collector scripts https://issues.apache.org/jira/browse/EAGLE-1023 * fix multi-thread bug in fnmatch * add HBase ha check script * change url connection timeout from 30s to 60s * add necessary exception handling * add two new metrics `hadoop.namenode.dfs.checkpointtimelag` & `hadoop.namenode.fsnamesystemstate.numrevisedlivedatanodes` * update metric filter configuration Author: Zhao, Qingwen <[email protected]> Closes #935 from qingwen220/EAGLE-1023. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e8a58b66 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e8a58b66 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e8a58b66 Branch: refs/heads/master Commit: e8a58b66f1fce57f1f73f9d97e1aaf2944e69a74 Parents: 43bd197 Author: Zhao, Qingwen <[email protected]> Authored: Wed May 24 16:24:11 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Wed May 24 16:24:11 2017 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_collector/hadoop_ha_checker.py | 149 ++++++++++++++++--- .../hadoop_jmx_config-sample.json | 97 +++++++++++- .../hadoop_jmx_collector/hadoop_jmx_config.json | 62 -------- .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 19 ++- .../hbase_jmx_config-sample.json | 96 ------------ .../hadoop_jmx_collector/metric_collector.py | 21 ++- .../mr/history/MRHistoryJobDailyReporter.java | 36 +++-- 7 files changed, 268 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py index dcca28e..f02d8df 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py @@ -22,38 +22,35 @@ import logging,socket,string class HadoopNNHAChecker(MetricCollector): def run(self): hosts = [] - host_name_list = [] + for input in self.config["input"]: if not input.has_key("host"): input["host"] = socket.getfqdn() if input.has_key("component") and input["component"] == "namenode": hosts.append(input) - host_name_list.append(input["host"]) if not bool(hosts): logging.warn("non hosts are configured as 'namenode' in 'input' config, exit") return logging.info("Checking namenode HA: " + str(hosts)) - total_count = len(hosts) - - all_hosts_name = string.join(host_name_list,",") - - self.collect({ - "host": all_hosts_name, - "component": "namenode", - "metric": "hadoop.namenode.hastate.total.count", - "value": total_count - }) active_count = 0 standby_count = 0 failed_count = 0 + failed_host_list = [] + host_name_list = [] + for host in hosts: try: - bean = JmxReader(host["host"], host["port"], host["https"]).open().get_jmx_bean_by_name( - "Hadoop:service=NameNode,name=FSNamesystem") + if host.has_key("source_host"): + host["host"] = host["source_host"] + + host_name_list.append(host["host"]) + bean = JmxReader(host["host"], host["port"], host["https"]) \ + .read_query("/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem&anonymous=true") \ + .get_jmx_bean_by_name("Hadoop:service=NameNode,name=FSNamesystem") if not bean: logging.error("JMX Bean[Hadoop:service=NameNode,name=FSNamesystem] is null from " + host["host"]) if bean.has_key("tag.HAState"): @@ -67,6 +64,19 @@ class HadoopNNHAChecker(MetricCollector): except Exception as e: logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"]) failed_count += 1 + failed_host_list.append(host["host"]) + + + total_count = len(hosts) + all_hosts_name = string.join(host_name_list,",") + + self.collect({ + "host": all_hosts_name, + "component": "namenode", + "metric": "hadoop.namenode.hastate.total.count", + "value": total_count + }) + self.collect({ "host": all_hosts_name, "component": "namenode", @@ -81,6 +91,9 @@ class HadoopNNHAChecker(MetricCollector): "value": standby_count }) + if len(failed_host_list) > 0: + all_hosts_name = string.join(failed_host_list,",") + self.collect({ "host": all_hosts_name, "component": "namenode", @@ -88,35 +101,109 @@ class HadoopNNHAChecker(MetricCollector): "value": failed_count }) -class HadoopRMHAChecker(MetricCollector): +class HadoopHBaseHAChecker(MetricCollector): def run(self): hosts = [] - all_hosts = [] + for input in self.config["input"]: if not input.has_key("host"): input["host"] = socket.getfqdn() - if input.has_key("component") and input["component"] == "resourcemanager": + if input.has_key("component") and input["component"] == "hbasemaster": hosts.append(input) - all_hosts.append(input["host"]) + if not bool(hosts): - logging.warn("Non hosts are configured as 'resourcemanager' in 'input' config, exit") + logging.warn("non hosts are configured as 'hbasemaster' in 'input' config, exit") return - logging.info("Checking resource manager HA: " + str(hosts)) + logging.info("Checking HBase HA: " + str(hosts)) + + active_count = 0 + standby_count = 0 + failed_count = 0 + + failed_host_list = [] + host_name_list = [] + + for host in hosts: + try: + if host.has_key("source_host"): + host["host"] = host["source_host"] + host_name_list.append(host["host"]) + bean = JmxReader(host["host"], host["port"], host["https"]) \ + .read_query("/jmx?qry=Hadoop:service=HBase,name=Master,sub=Server&anonymous=true") \ + .get_jmx_bean_by_name("Hadoop:service=HBase,name=Master,sub=Server") + if not bean: + logging.error("JMX Bean[Hadoop:service=HBase,name=Master,sub=Server] is null from " + host["host"]) + if bean.has_key("tag.isActiveMaster"): + logging.debug(str(host) + " is " + bean["tag.isActiveMaster"]) + if bean["tag.isActiveMaster"] == "true": + active_count += 1 + else: + standby_count += 1 + else: + logging.info("'tag.isActiveMaster' not found from jmx of " + host["host"] + ":" + host["port"]) + except Exception as e: + logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"]) + failed_count += 1 + failed_host_list.append(host["host"]) + total_count = len(hosts) - all_hosts_name = string.join(all_hosts,",") + all_hosts_name = string.join(host_name_list,",") self.collect({ "host": all_hosts_name, - "component": "resourcemanager", - "metric": "hadoop.resourcemanager.hastate.total.count", + "component": "hbasemaster", + "metric": "hadoop.hbasemaster.hastate.total.count", "value": total_count }) + self.collect({ + "host": all_hosts_name, + "component": "hbasemaster", + "metric": "hadoop.hbasemaster.hastate.active.count", + "value": active_count + }) + + self.collect({ + "host": all_hosts_name, + "component": "hbasemaster", + "metric": "hadoop.hbasemaster.hastate.standby.count", + "value": standby_count + }) + + if len(failed_host_list) > 0: + all_hosts_name = string.join(failed_host_list,",") + + self.collect({ + "host": all_hosts_name, + "component": "hbasemaster", + "metric": "hadoop.hbasemaster.hastate.failed.count", + "value": failed_count + }) + + +class HadoopRMHAChecker(MetricCollector): + def run(self): + hosts = [] + all_hosts = [] + for input in self.config["input"]: + if not input.has_key("host"): + input["host"] = socket.getfqdn() + if input.has_key("component") and input["component"] == "resourcemanager": + hosts.append(input) + all_hosts.append(input["host"]) + if not bool(hosts): + logging.warn("Non hosts are configured as 'resourcemanager' in 'input' config, exit") + return + + logging.info("Checking resource manager HA: " + str(hosts)) + active_count = 0 standby_count = 0 failed_count = 0 + failed_host_list = [] + for host in hosts: try: cluster_info = YarnWSReader(host["host"], host["port"], host["https"]).read_cluster_info() @@ -130,6 +217,17 @@ class HadoopRMHAChecker(MetricCollector): except Exception as e: logging.error("Failed to read yarn ws from " + str(host)) failed_count += 1 + failed_host_list.append(host["host"]) + + total_count = len(hosts) + all_hosts_name = string.join(all_hosts,",") + + self.collect({ + "host": all_hosts_name, + "component": "resourcemanager", + "metric": "hadoop.resourcemanager.hastate.total.count", + "value": total_count + }) self.collect({ "host": all_hosts_name, @@ -145,6 +243,9 @@ class HadoopRMHAChecker(MetricCollector): "value": standby_count }) + if len(failed_host_list) > 0: + all_hosts_name = string.join(failed_host_list,",") + self.collect({ "host": all_hosts_name, "component": "resourcemanager", @@ -153,4 +254,4 @@ class HadoopRMHAChecker(MetricCollector): }) if __name__ == '__main__': - Runner.run(HadoopNNHAChecker(), HadoopRMHAChecker()) \ No newline at end of file + Runner.run(HadoopNNHAChecker(), HadoopHBaseHAChecker(), HadoopRMHAChecker()) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json index a6ddf7d..786072b 100755 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json @@ -29,7 +29,76 @@ "metric_name_filter": [ "hadoop.memory.heapmemoryusage.used", "hadoop.memory.nonheapmemoryusage.used", + "hadoop.bufferpool.direct.memoryused", + + "hadoop.hbase.master.server.averageload", + "hadoop.hbase.master.server.numdeadregionservers", + "hadoop.hbase.master.assignmentmanger.ritcount", + "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold", + "hadoop.hbase.master.assignmentmanger.assign_num_ops", + "hadoop.hbase.master.assignmentmanger.assign_min", + "hadoop.hbase.master.assignmentmanger.assign_max", + "hadoop.hbase.master.assignmentmanger.assign_75th_percentile", + "hadoop.hbase.master.assignmentmanger.assign_95th_percentile", + "hadoop.hbase.master.assignmentmanger.assign_99th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops", + "hadoop.hbase.master.assignmentmanger.bulkassign_min", + "hadoop.hbase.master.assignmentmanger.bulkassign_max", + "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile", + "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile", + "hadoop.hbase.master.balancer.balancercluster_num_ops", + "hadoop.hbase.master.balancer.balancercluster_min", + "hadoop.hbase.master.balancer.balancercluster_max", + "hadoop.hbase.master.balancer.balancercluster_75th_percentile", + "hadoop.hbase.master.balancer.balancercluster_95th_percentile", + "hadoop.hbase.master.balancer.balancercluster_99th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_min", + "hadoop.hbase.master.filesystem.hlogsplittime_max", + "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile", + "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile", + "hadoop.hbase.master.filesystem.hlogsplitsize_min", + "hadoop.hbase.master.filesystem.hlogsplitsize_max", + "hadoop.hbase.master.filesystem.metahlogsplittime_min", + "hadoop.hbase.master.filesystem.metahlogsplittime_max", + "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile", + "hadoop.hbase.master.filesystem.metahlogsplitsize_min", + "hadoop.hbase.master.filesystem.metahlogsplitsize_max", + + "hadoop.hbase.jvm.gccount", + "hadoop.hbase.jvm.gctimemillis", + "hadoop.hbase.ipc.ipc.queuesize", + "hadoop.hbase.ipc.ipc.numcallsingeneralqueue", + "hadoop.hbase.ipc.ipc.numactivehandler", + "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile", + "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile", + "hadoop.hbase.ipc.ipc.queuecalltime_num_ops", + "hadoop.hbase.ipc.ipc.processcalltime_num_ops", + "hadoop.hbase.regionserver.server.regioncount", + "hadoop.hbase.regionserver.server.storecount", + "hadoop.hbase.regionserver.server.memstoresize", + "hadoop.hbase.regionserver.server.storefilesize", + "hadoop.hbase.regionserver.server.totalrequestcount", + "hadoop.hbase.regionserver.server.readrequestcount", + "hadoop.hbase.regionserver.server.writerequestcount", + "hadoop.hbase.regionserver.server.splitqueuelength", + "hadoop.hbase.regionserver.server.compactionqueuelength", + "hadoop.hbase.regionserver.server.flushqueuelength", + "hadoop.hbase.regionserver.server.blockcachesize", + "hadoop.hbase.regionserver.server.blockcachehitcount", + "hadoop.hbase.regionserver.server.blockcachecounthitpercent", + + "hadoop.memory.heapmemoryusage.used", + "hadoop.memory.nonheapmemoryusage.used", "hadoop.namenode.fsnamesystemstate.capacitytotal", + "hadoop.namenode.fsnamesystemstate.capacityusage", + "hadoop.namenode.fsnamesystemstate.topuseropcounts", + "hadoop.namenode.fsnamesystemstate.fsstate", + "hadoop.namenode.fsnamesystemstate.numlivedatanodes", + "hadoop.namenode.fsnamesystemstate.numrevisedlivedatanodes", "hadoop.namenode.dfs.capacityused", "hadoop.namenode.dfs.capacityremaining", "hadoop.namenode.dfs.blockstotal", @@ -38,6 +107,7 @@ "hadoop.namenode.dfs.missingblocks", "hadoop.namenode.dfs.corruptblocks", "hadoop.namenode.dfs.lastcheckpointtime", + "hadoop.namenode.dfs.checkpointtimelag", "hadoop.namenode.dfs.transactionssincelastcheckpoint", "hadoop.namenode.dfs.lastwrittentransactionid", "hadoop.namenode.dfs.snapshottabledirectories", @@ -46,6 +116,9 @@ "hadoop.namenode.rpc.rpcprocessingtimeavgtime", "hadoop.namenode.rpc.numopenconnections", "hadoop.namenode.rpc.callqueuelength", + "hadoop.namenode.rpc.hadoop.namenode.rpc.rpcqueuetimeavgtime", + "hadoop.namenode.rpc.hadoop.namenode.rpc.rpcprocessingtimeavgtime", + "hadoop.namenode.namenodeinfo.corruptfiles", "hadoop.datanode.fsdatasetstate.capacity", "hadoop.datanode.fsdatasetstate.dfsused", @@ -53,16 +126,32 @@ "hadoop.datanode.rpc.rpcqueuetimeavgtime", "hadoop.datanode.rpc.rpcprocessingtimeavgtime", "hadoop.datanode.rpc.numopenconnections", - "hadoop.datanode.rpc.callqueuelength" + "hadoop.datanode.rpc.callqueuelength", + + "hadoop.namenode.hastate.total.count", + "hadoop.namenode.hastate.active.count", + "hadoop.namenode.hastate.standby.count", + "hadoop.namenode.hastate.failed.count", + + "hadoop.resourcemanager.yarn.numunhealthynms", + "hadoop.resourcemanager.yarn.numlostnms", + "hadoop.resourcemanager.yarn.numrebootednms", + "hadoop.resourcemanager.yarn.numdecommissionednms", + "hadoop.resourcemanager.yarn.numactivenms", + + "hadoop.resourcemanager.hastate.total.count", + "hadoop.resourcemanager.hastate.active.count", + "hadoop.resourcemanager.hastate.standby.count", + "hadoop.resourcemanager.hastate.failed.count" ] }, "output": { "kafka": { "debug": false, "default_topic": "hadoop_jmx_metric_sandbox", - "component_topic_mapping": { - "namenode": "nn_jmx_metric_sandbox", - "resourcemanager": "rm_jmx_metric_sandbox" + "metric_topic_mapping": { + "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox", + "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox" }, "broker_list": [ "sandbox.hortonworks.com:6667" http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json deleted file mode 100755 index 23c89b3..0000000 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "env": { - "site": "sandbox", - "metric_prefix": "hadoop.", - "log_file": "/tmp/hadoop-jmx-collector.log" - }, - "input": [ - { - "component": "namenode", - "host": "sandbox.hortonworks.com", - "port": "50070", - "https": true - } - ], - "filter": { - "bean_group_filter": ["hadoop","java.lang"], - "metric_name_filter": [ - "hadoop.memory.heapmemoryusage.used", - "hadoop.memory.nonheapmemoryusage.used", - "hadoop.namenode.fsnamesystemstate.capacitytotal", - "hadoop.namenode.fsnamesystemstate.topuseropcounts", - "hadoop.namenode.namenodeinfo.corruptfiles", - "hadoop.namenode.dfs.capacityused", - "hadoop.namenode.dfs.capacityremaining", - "hadoop.namenode.dfs.blockstotal", - "hadoop.namenode.dfs.filestotal", - "hadoop.namenode.dfs.underreplicatedblocks", - "hadoop.namenode.dfs.missingblocks", - "hadoop.namenode.dfs.corruptblocks", - "hadoop.namenode.dfs.lastcheckpointtime", - "hadoop.namenode.dfs.transactionssincelastcheckpoint", - "hadoop.namenode.dfs.lastwrittentransactionid", - "hadoop.namenode.dfs.snapshottabledirectories", - "hadoop.namenode.dfs.snapshots", - "hadoop.namenode.rpc.rpcqueuetimeavgtime", - "hadoop.namenode.rpc.rpcprocessingtimeavgtime", - "hadoop.namenode.rpc.numopenconnections", - "hadoop.namenode.rpc.callqueuelength", - - "hadoop.datanode.fsdatasetstate.capacity", - "hadoop.datanode.fsdatasetstate.dfsused", - "hadoop.datanode.datanodeinfo.xceivercount", - "hadoop.datanode.rpc.rpcqueuetimeavgtime", - "hadoop.datanode.rpc.rpcprocessingtimeavgtime", - "hadoop.datanode.rpc.numopenconnections", - "hadoop.datanode.rpc.callqueuelength" - ] - }, - "output": { - "kafka": { - "debug": false, - "default_topic": "hadoop_jmx_metric_sandbox", - "metric_topic_mapping": { - "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox", - "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox" - }, - "broker_list": [ - "sandbox.hortonworks.com:6667" - ] - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index 60c6367..8b64177 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -17,7 +17,7 @@ # from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricNameConverter -import json, logging, fnmatch, sys +import json, logging, fnmatch, sys, time class NNSafeModeMetric(JmxMetricListener): def on_metric(self, metric): @@ -28,15 +28,13 @@ class NNSafeModeMetric(JmxMetricListener): metric["value"] = 0 self.collector.collect(metric) -class NNHAMetric(JmxMetricListener): - PREFIX = "hadoop.namenode.fsnamesystem" +class NNFileSystemMetric(JmxMetricListener): + PREFIX = "hadoop.namenode.dfs" def on_bean(self, component, bean): if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystem": - if bean[u"tag.HAState"] == "active": - self.collector.on_bean_kv(self.PREFIX, component, "hastate", 0) - else: - self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1) + checkpointtimelag = int(round(time.time() * 1000)) - bean['LastCheckpointTime'] + self.collector.on_bean_kv(self.PREFIX, component, "checkpointtimelag", checkpointtimelag) class corruptfilesMetric(JmxMetricListener): def on_metric(self, metric): @@ -48,7 +46,6 @@ class TopUserOpCountsMetric(JmxMetricListener): if metric["metric"] == "hadoop.namenode.fsnamesystemstate.topuseropcounts": self.collector.collect(metric, "string", MetricNameConverter()) - class MemoryUsageMetric(JmxMetricListener): PREFIX = "hadoop.namenode.jvm" @@ -72,6 +69,9 @@ class NNCapacityUsageMetric(JmxMetricListener): capacityusage = round(float(bean['CapacityUsed']) / float(bean['CapacityTotal']) * 100, 2) self.collector.on_bean_kv(self.PREFIX, component, "capacityusage", capacityusage) + numrevisedlivedatanodes = bean['NumLiveDataNodes'] + bean['NumDecomDeadDataNodes'] + self.collector.on_bean_kv(self.PREFIX, component, "numrevisedlivedatanodes", numrevisedlivedatanodes) + class JournalTransactionInfoMetric(JmxMetricListener): PREFIX = "hadoop.namenode.journaltransaction" @@ -107,12 +107,11 @@ if __name__ == '__main__': collector = JmxMetricCollector() collector.register( NNSafeModeMetric(), - NNHAMetric(), + NNFileSystemMetric(), MemoryUsageMetric(), NNCapacityUsageMetric(), JournalTransactionInfoMetric(), DatanodeFSDatasetState(), - HBaseRegionServerMetric(), corruptfilesMetric(), TopUserOpCountsMetric() ) http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json deleted file mode 100644 index c37a9ae..0000000 --- a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json +++ /dev/null @@ -1,96 +0,0 @@ -{ - "env": { - "site": "sandbox", - "metric_prefix": "hadoop.", - "log_file": "/tmp/hadoop-jmx-collector.log" - }, - "input": [ - { - "component": "hbasemaster", - "host": "sandbox.hortonworks.com", - "port": "60010", - "https": false - }, - { - "component": "regionserver", - "host": "sandbox.hortonworks.com", - "port": "60030", - "https": false - } - ], - "filter": { - "bean_group_filter": ["hadoop","java.lang","java.nio"], - "metric_name_filter": [ - "hadoop.memory.heapmemoryusage.used", - "hadoop.memory.nonheapmemoryusage.used", - "hadoop.bufferpool.direct.memoryused", - "hadoop.hbase.master.server.averageload", - "hadoop.hbase.master.assignmentmanger.ritcount", - "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold", - "hadoop.hbase.master.assignmentmanger.assign_num_ops", - "hadoop.hbase.master.assignmentmanger.assign_min", - "hadoop.hbase.master.assignmentmanger.assign_max", - "hadoop.hbase.master.assignmentmanger.assign_75th_percentile", - "hadoop.hbase.master.assignmentmanger.assign_95th_percentile", - "hadoop.hbase.master.assignmentmanger.assign_99th_percentile", - "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops", - "hadoop.hbase.master.assignmentmanger.bulkassign_min", - "hadoop.hbase.master.assignmentmanger.bulkassign_max", - "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile", - "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile", - "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile", - "hadoop.hbase.master.balancer.balancercluster_num_ops", - "hadoop.hbase.master.balancer.balancercluster_min", - "hadoop.hbase.master.balancer.balancercluster_max", - "hadoop.hbase.master.balancer.balancercluster_75th_percentile", - "hadoop.hbase.master.balancer.balancercluster_95th_percentile", - "hadoop.hbase.master.balancer.balancercluster_99th_percentile", - "hadoop.hbase.master.filesystem.hlogsplittime_min", - "hadoop.hbase.master.filesystem.hlogsplittime_max", - "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile", - "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile", - "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile", - "hadoop.hbase.master.filesystem.hlogsplitsize_min", - "hadoop.hbase.master.filesystem.hlogsplitsize_max", - "hadoop.hbase.master.filesystem.metahlogsplittime_min", - "hadoop.hbase.master.filesystem.metahlogsplittime_max", - "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile", - "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile", - "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile", - "hadoop.hbase.master.filesystem.metahlogsplitsize_min", - "hadoop.hbase.master.filesystem.metahlogsplitsize_max", - - "hadoop.hbase.jvm.gccount", - "hadoop.hbase.jvm.gctimemillis", - "hadoop.hbase.ipc.ipc.queuesize", - "hadoop.hbase.ipc.ipc.numcallsingeneralqueue", - "hadoop.hbase.ipc.ipc.numactivehandler", - "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile", - "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile", - "hadoop.hbase.ipc.ipc.queuecalltime_num_ops", - "hadoop.hbase.ipc.ipc.processcalltime_num_ops", - "hadoop.hbase.regionserver.server.regioncount", - "hadoop.hbase.regionserver.server.storecount", - "hadoop.hbase.regionserver.server.memstoresize", - "hadoop.hbase.regionserver.server.storefilesize", - "hadoop.hbase.regionserver.server.totalrequestcount", - "hadoop.hbase.regionserver.server.readrequestcount", - "hadoop.hbase.regionserver.server.writerequestcount", - "hadoop.hbase.regionserver.server.splitqueuelength", - "hadoop.hbase.regionserver.server.compactionqueuelength", - "hadoop.hbase.regionserver.server.flushqueuelength", - "hadoop.hbase.regionserver.server.blockcachesize", - "hadoop.hbase.regionserver.server.blockcachehitcount", - "hadoop.hbase.regionserver.server.blockcounthitpercent" - ] - }, - "output": { - "kafka": { - "debug": false, - "default_topic": "hadoop_jmx_metric_sandbox", - "broker_list": [ - "sandbox.hortonworks.com:6667" - ] - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index c83fe6b..efd998a 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -106,12 +106,12 @@ class Helper: try: if https: logging.info("Reading https://" + str(url) + path) - c = httplib.HTTPSConnection(url, timeout=30) + c = httplib.HTTPSConnection(url, timeout=60) c.request("GET", path) response = c.getresponse() else: logging.info("Reading http://" + str(url) + path) - response = urllib2.urlopen("http://" + str(url) + path, timeout=30) + response = urllib2.urlopen("http://" + str(url) + path, timeout=60) logging.debug("Got response") result = response.read() break @@ -158,6 +158,13 @@ class JmxReader(object): raise Exception("Response from " + url + " is None") return self + def read_query(self, qry): + self.jmx_raw = Helper.http_get(self.host, self.port, self.https, qry) + if self.jmx_raw is None: + raise Exception("Response from " + url + " is None") + self.set_raw(self.jmx_raw) + return self + def set_raw(self, text): self.jmx_json = json.loads(text) self.jmx_beans = self.jmx_json[u'beans'] @@ -496,7 +503,11 @@ class JmxMetricCollector(MetricCollector): self.on_bean_kv(metric_prefix_name, source, key, value) for listener in self.listeners: - listener.on_bean(source, bean.copy()) + try: + listener.on_bean(source, bean.copy()) + except Exception as e: + logging.error("Failed to parse bean: " + bean["name"]) + logging.exception(e) def on_bean_kv(self, prefix, source, key, value): # Skip Tags @@ -574,7 +585,9 @@ class MetricNameFilter(MetricFilter): return True else: for name_filter in self.metric_name_filter: - if fnmatch.fnmatch(metric["metric"], name_filter): + # multiple threads bug exists in fnmatch + #if fnmatch.fnmatch(metric["metric"], name_filter): + if re.match(name_filter, metric['metric']): return True return False http://git-wip-us.apache.org/repos/asf/eagle/blob/e8a58b66/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java index 541d352..f01c93e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobDailyReporter.java @@ -144,14 +144,20 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { LOG.warn("application MR_HISTORY_JOB_APP does not run on any sites!"); return; } + + int reportHour = currentHour / dailySentPeriod * dailySentPeriod; + calendar.set(Calendar.HOUR_OF_DAY, reportHour); + long endTime = calendar.getTimeInMillis() / DateTimeUtil.ONEHOUR * DateTimeUtil.ONEHOUR; + long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod; + for (String site : sites) { - int reportHour = currentHour / dailySentPeriod * dailySentPeriod; - calendar.set(Calendar.HOUR_OF_DAY, reportHour); - long endTime = calendar.getTimeInMillis() / DateTimeUtil.ONEHOUR * DateTimeUtil.ONEHOUR; - long startTime = endTime - DateTimeUtil.ONEHOUR * dailySentPeriod; - String subject = buildAlertSubject(site, startTime, endTime); - Map<String, Object> alertData = buildAlertData(site, startTime, endTime); - sendByEmailWithSubject(alertData, subject); + try { + String subject = buildAlertSubject(site, startTime, endTime); + Map<String, Object> alertData = buildAlertData(site, startTime, endTime); + sendByEmailWithSubject(alertData, subject); + } catch (Exception e) { + LOG.error("Job report failed for {} due to {}", site, e.getMessage(), e); + } } } catch (Exception ex) { LOG.error("Fail to get job summery info due to {}", ex.getMessage(), ex); @@ -215,12 +221,18 @@ public class MRHistoryJobDailyReporter extends AbstractScheduledService { return data; } Long totalJobs = jobSummery.values().stream().reduce((a, b) -> a + b).get(); - String finishedJobQuery = String.format(FINISHED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime); - String failedJobQuery = String.format(FAILED_JOBS_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime); - String succeededJobQuery = String.format(SUCCEEDED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, jobOvertimeLimit * DateTimeUtil.ONEHOUR, endTime); data.put(SUMMARY_INFO_KEY, processResult(jobSummery, totalJobs)); - data.put(FAILED_JOB_USERS_KEY, buildJobSummery(failedJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.FAILED.toString()))); - data.put(SUCCEEDED_JOB_USERS_KEY, buildJobSummery(succeededJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.SUCCEEDED.toString()))); + + if (jobSummery.containsKey(Constants.JobState.FAILED.toString())) { + String failedJobQuery = String.format(FAILED_JOBS_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime); + data.put(FAILED_JOB_USERS_KEY, buildJobSummery(failedJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.FAILED.toString()))); + } + if (jobSummery.containsKey(Constants.JobState.SUCCEEDED.toString())) { + String succeededJobQuery = String.format(SUCCEEDED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, jobOvertimeLimit * DateTimeUtil.ONEHOUR, endTime); + data.put(SUCCEEDED_JOB_USERS_KEY, buildJobSummery(succeededJobQuery, startTime, endTime, jobSummery.get(Constants.JobState.SUCCEEDED.toString()))); + } + + String finishedJobQuery = String.format(FINISHED_JOB_QUERY, Constants.MR_JOB_EXECUTION_SERVICE_NAME, site, endTime); data.put(FINISHED_JOB_USERS_KEY, buildJobSummery(finishedJobQuery, startTime, endTime, totalJobs)); return data;
