Repository: eagle Updated Branches: refs/heads/master b4695801f -> 3af9ac480
[EAGLE-849] Refactor system metric collector scripts Refactor System metric collector python script following similar framework as existing jmx metric collector. https://issues.apache.org/jira/browse/EAGLE-849 Author: Hao Chen <[email protected]> Closes #763 from haoch/SystemMetricCollector. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3af9ac48 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3af9ac48 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3af9ac48 Branch: refs/heads/master Commit: 3af9ac4802a9dec2f2160aa98cd3b90287e14d48 Parents: b469580 Author: Hao Chen <[email protected]> Authored: Thu Jan 12 11:26:10 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Thu Jan 12 11:26:10 2017 +0800 ---------------------------------------------------------------------- .../src/main/resources/ALERT_LIGHT_TEMPLATE.vm | 2 +- .../system_metric_collector.py | 336 ++++++++++++++++ .../system_metric_config-sample.json | 20 + .../hadoop_jmx_collector/system_metric_kafka.py | 393 ------------------- 4 files changed, 357 insertions(+), 394 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm index 0eb5efc..f273917 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm @@ -482,7 +482,7 @@ <div class="footer"> <table width="100%"> <tr> - <td class="aligncenter content-block">Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $alert["version"])</td> + <td class="aligncenter content-block">Powered by <a href="http://eagle.apache.org">Apache Eagle</a> (version: $alert["version"])</td> </tr> </table> </div> http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py new file mode 100644 index 0000000..e0ffecc --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py @@ -0,0 +1,336 @@ +# !/usr/bin/python +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from metric_collector import MetricCollector, Runner +import logging, socket, string, os, re, time + + +class SystemMetricCollector(MetricCollector): + METRIC_PREFIX = "system" + METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]") + + def run(self): + if self.config["env"].has_key("cpu_stat_file"): + self.cpu_stat_file = self.config["env"]["cpu_stat_file"] + logging.info("Overrode env.cpu_stat_file: %s", self.cpu_stat_file) + else: + self.cpu_stat_file = "/tmp/eagle_cpu_usage_state" + logging.info("Using default env.cpu_stat_file: %s", self.cpu_stat_file) + + self.try_exec_func( + self.collect_cpu_metric, + self.collect_uptime_metric, + self.collect_memory_metric, + self.collect_loadavg_metric, + self.collect_cpu_temp_metric, + self.collect_nic_metric, + self.collect_smartdisk_metric, + self.collect_diskstat_metric + ) + + def try_exec_func(self, *funcs): + result = dict() + succeed_num = 0 + failed_num = 0 + for func in funcs: + try: + logging.info("Executing: %s", func.__name__) + func() + result[func.__name__] = "success" + succeed_num = succeed_num + 1 + except Exception as e: + logging.warn("Failed to execute: %s", func.__name__) + logging.exception(e) + result[func.__name__] = "error: %s: %s" % (type(e), e) + failed_num = failed_num + 1 + result_desc = "" + for key in result: + result_desc = result_desc + "%-30s: %-30s\n" % (key, result[key]) + logging.info("Execution result (total: %s, succeed: %s, failed: %s): \n\n%s", len(funcs), succeed_num, + failed_num, result_desc) + + # ==================================== + # CPU Usage + # ==================================== + + def collect_cpu_metric(self): + """ + CPU Usage Percentage Metrics: + + system.cpu.usage: (user + nice + system + wait + irq + softirq + steal + guest) / (user + nice + system + idle + wait + irq + softirq + steal + guest) + + Example: + + {'timestamp': 1483594861458, 'metric': 'system.cpu.usage', 'site': u'sandbox', 'value': 0.048, 'host': 'localhost', 'device': 'cpuN'} + + system.cpu.totalusage: Sum(Each CPU Usage) / Sum (CPU Total) + + Example: + + {'timestamp': 1483594861484, 'metric': 'system.cpu.totalusage', 'site': u'sandbox', 'value': 0.17, 'host': 'sandbox.hortonworks.com', 'device': 'cpu'} + + """ + + cpu_metric = self.new_metric() + cpu_info = os.popen('cat /proc/stat').readlines() + dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"] + + total_cpu = 0 + total_cpu_usage = 0 + cpu_stat_pre = None + + data_dir = self.cpu_stat_file + if os.path.exists(data_dir): + fd = open(data_dir, "r") + cpu_stat_pre = fd.read() + fd.close() + + for item in cpu_info: + if re.match(r'^cpu\d+', item) is None: + continue + + items = re.split("\s+", item.strip()) + demens = min(len(dimensions), len(items)) + metric_event = dict() + for i in range(1, demens): + metric_event[dimensions[i]] = int(items[i]) + cpu_metric['timestamp'] = int(round(time.time() * 1000)) + cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i] + cpu_metric['device'] = items[0] + cpu_metric['value'] = items[i] + self.collect(cpu_metric) + + per_cpu_usage = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[ + "wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"] + per_cpu_total = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[ + "idle"] + metric_event["wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"] + total_cpu += per_cpu_total + total_cpu_usage += per_cpu_usage + + # system.cpu.usage + cpu_metric['timestamp'] = int(round(time.time() * 1000)) + cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage" + cpu_metric['device'] = items[0] + cpu_metric['value'] = per_cpu_usage * 1.0 /per_cpu_total + self.collect(cpu_metric) + + cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu) + logging.info("Current cpu stat: %s", cup_stat_current) + fd = open(data_dir, "w") + fd.write(cup_stat_current) + fd.close() + + pre_total_cpu_usage = 0 + pre_total_cpu = 0 + if cpu_stat_pre != None: + result = re.split("\s+", cpu_stat_pre.rstrip()) + pre_total_cpu_usage = int(result[0]) + pre_total_cpu = int(result[1]) + cpu_metric['timestamp'] = int(round(time.time() * 1000)) + cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage" + cpu_metric['device'] = "cpu" + cpu_metric['value'] = (total_cpu_usage - pre_total_cpu_usage) * 1.0 / (total_cpu - pre_total_cpu) + + self.collect(cpu_metric) + + # ==================================== + # OS Up Time + # ==================================== + + def collect_uptime_metric(self): + metric = self.new_metric() + demension = ["uptime.day", "idletime.day"] + output = os.popen('cat /proc/uptime').readlines() + + for item in output: + items = re.split("\s+", item.rstrip()) + for i in range(len(demension)): + metric["timestamp"] = int(round(time.time() * 1000)) + metric["metric"] = self.METRIC_PREFIX + "." + 'uptime' + '.' + demension[i] + metric["value"] = str(round(float(items[i]) / 86400, 2)) + self.collect(metric) + + # ==================================== + # Memory + # ==================================== + + def collect_memory_metric(self): + event = self.new_metric() + event["host"] = self.fqdn + output = os.popen('cat /proc/meminfo').readlines() + mem_info = dict() + for item in output: + items = re.split(":?\s+", item.rstrip()) + # print items + mem_info[items[0]] = int(items[1]) + itemNum = len(items) + metric = 'memory' + '.' + items[0] + if (len(items) > 2): + metric = metric + '.' + items[2] + event["timestamp"] = int(round(time.time() * 1000)) + event["metric"] = self.METRIC_NAME_EXCLUDE.sub("", self.METRIC_PREFIX + "." + metric.lower()) + event["value"] = items[1] + event["device"] = 'memory' + self.collect(event) + + usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / \ + mem_info[ + 'MemTotal'] + usage = round(usage, 2) + self.emit_metric(event, self.METRIC_PREFIX, "memory.usage", usage, "memory") + + # ==================================== + # Load AVG + # ==================================== + + def collect_loadavg_metric(self): + """ + Collect Load Avg Metrics + """ + demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min'] + output = os.popen('cat /proc/loadavg').readlines() + for item in output: + items = re.split("\s+", item.rstrip()) + demens = min(len(demension), len(items)) + for i in range(demens): + event = self.new_metric() + event["timestamp"] = int(round(time.time() * 1000)) + event["metric"] = self.METRIC_PREFIX + "." + demension[i] + event["value"] = items[i] + event["device"] = 'cpu' + self.collect(event) + + # ==================================== + # IPMI CPU Temp + # ==================================== + + def collect_cpu_temp_metric(self): + output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines() + for item in output: + items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip()) + event = self.new_metric() + event["timestamp"] = int(round(time.time() * 1000)) + event["metric"] = DATA_TYPE + "." + 'cpu.temp' + event["value"] = items[2] + event["device"] = item[1] + self.collect(event) + + # ==================================== + # NIC Metrics + # ==================================== + + def collect_nic_metric(self): + demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', + 'transmitpackets', + 'transmiterrs', 'transmitdrop'] + output = os.popen("cat /proc/net/dev").readlines() + + for item in output: + if re.match(r'^\s+eth\d+:', item) is None: + continue + items = re.split("[:\s]+", item.strip()) + filtered_items = items[1:5] + items[9:13] + + for i in range(len(demension)): + kafka_dict = self.new_metric() + kafka_dict["timestamp"] = int(round(time.time() * 1000)) + kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i] + kafka_dict["value"] = filtered_items[i] + kafka_dict["device"] = items[0] + self.collect(kafka_dict) + + # ==================================== + # Smart Disk Metrics + # ==================================== + + def collect_smartdisk_metric(self): + harddisks = os.popen("lsscsi").readlines() + for item in harddisks: + items = re.split('\/', item.strip()) + # print items + smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines() + for line in smartctl: + line = line.strip() + if re.match(r'^[\d]+\s', line) is None: + continue + lineitems = re.split("\s+", line) + metric = 'smartdisk.' + lineitems[1] + kafka_dict = self.new_metric() + kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() + kafka_dict["timestamp"] = int(round(time.time() * 1000)) + kafka_dict["value"] = lineitems[-1] + kafka_dict["device"] = 'smartdisk' + self.collect(kafka_dict) + + # ==================================== + # Disk Stat Metrics + # ==================================== + + def collect_diskstat_metric(self): + """ + FIXME: IndexError: list index out of range + """ + demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage'] + iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines() + # remove the first set of elements + iostat_output = iostat_output[len(iostat_output) / 2:] + iostat_dict = {} + for item in iostat_output: + items = re.split('\s+', item.strip()) + filtered_items = [items[5], items[6], items[9], items[11]] + iostat_dict[items[0]] = filtered_items + + disk_output = os.popen("df -k | grep ^/dev").readlines() + for item in disk_output: + items = re.split('\s+', item.strip()) + disks = re.split('^\/dev\/(\w+)\d+$', items[0]) + logging.info(len(disks)) + disk = disks[1] + iostat_dict[disk].append(items[1]) + iostat_dict[disk].append(items[2]) + iostat_dict[disk].append(items[4].rstrip('%')) + + for key, metrics in iostat_dict.iteritems(): + for i in range(len(metrics)): + metric = 'disk.' + demension[i] + kafka_dict = self.new_metric() + kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() + kafka_dict["timestamp"] = int(round(time.time() * 1000)) + kafka_dict["value"] = metrics[i] + kafka_dict["device"] = key + self.collect(kafka_dict) + + # ==================================== + # Helper Methods + # ==================================== + + def emit_metric(self, event, prefix, metric, value, device): + event["timestamp"] = int(round(time.time() * 1000)) + event["metric"] = prefix + "." + metric.lower() + event["value"] = str(value) + event["device"] = device + self.collect(event) + + def new_metric(self): + metric = dict() + metric["host"] = self.fqdn + return metric + + +if __name__ == '__main__': + Runner.run(SystemMetricCollector()) http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json new file mode 100644 index 0000000..6fcd43b --- /dev/null +++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json @@ -0,0 +1,20 @@ +{ + "env": { + "site": "sandbox", + "log_file": "/tmp/hadoop-jmx-collector.log", + "cpu_stat_file": "/tmp/eagle_cpu_usage_state" + }, + "input": [ + ], + "filter": { + }, + "output": { + "kafka": { + "debug": false, + "default_topic": "system_metric_sandbox", + "broker_list": [ + "sandbox.hortonworks.com:6667" + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py b/eagle-external/hadoop_jmx_collector/system_metric_kafka.py deleted file mode 100644 index 7d805d7..0000000 --- a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py +++ /dev/null @@ -1,393 +0,0 @@ -#!/usr/bin/python - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import re -import time -import json -import sys -import socket -import types -import re -import errno - -# load kafka-python -sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six')) -import six - -# load kafka-python -sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python')) -from kafka import KafkaClient, SimpleProducer, SimpleConsumer - -from util_func import * - -TOPIC = "cronus_sys_metrics" -DATA_TYPE = "system" - -METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]") - -DEBUG_KAFKA_HOST = [] -PROD_KAFKA_HOST = [] - -PORT_MAP = { - "60030": "regionserver", - "50075": "datanode", - "50070": "namenode", - "60010": "master", - "50030": "resourcemanager", - "50060": "nodemanager", - "8480": "journalnode" -} - -def readFile(filename): - f = open(filename, 'r') - s = f.read() - f.close() - return s - -def kafka_connect(host): - print "Connecting to kafka " + str(host) - # To send messages synchronously - kafka = KafkaClient(host, timeout=58) - producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=500, batch_send_every_t=30) - return kafka, producer - - -def kafka_close(kafka, producer): - if producer is not None: - producer.stop() - if kafka is not None: - kafka.close() - - -def kafka_produce(producer, topic, kafka_json): - # Note that the application is responsible for encoding messages to type str - if producer != None : - producer.send_messages(topic, kafka_json) - else: - print kafka_json - - -def addExtraMetric(producer, kafka_dict, metric, value, device, topic): - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["metric"] = DATA_TYPE + "." + metric.lower() - kafka_dict["value"] = str(value) - kafka_dict["device"] = device - kafka_json = json.dumps(kafka_dict) - print(kafka_json) - kafka_produce(producer, topic, kafka_json) - - -def getCPU(producer, kafka_dict, topic): - cpu_info = os.popen('cat /proc/stat').readlines() - demension = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"] - - total_cpu = 0 - total_cpu_usage = 0 - cpu_stat_pre = None - - data_dir = "/tmp/eagle_cpu_stat_previous" - if os.path.exists(data_dir): - fd = open(data_dir, "r") - cpu_stat_pre = fd.read() - fd.close() - - for item in cpu_info: - if re.match(r'^cpu\d+', item) is None: - continue - - items = re.split("\s+", item.strip()) - demens = min(len(demension), len(items)) - # print items - tuple = dict() - for i in range(1, demens): - # if not isNumber(items[i]): - # continue - - tuple[demension[i]] = int(items[i]) - - kafka_dict['timestamp'] = int(round(time.time() * 1000)) - kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + demension[i] - kafka_dict['device'] = items[0] - kafka_dict['value'] = items[i] - kafka_json = json.dumps(kafka_dict) - #print kafka_json - kafka_produce(producer, topic, kafka_json) - - per_cpu_usage = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["wait"] + tuple["irq"] + tuple[ - "softirq"] + tuple["steal"] + tuple["guest"] - per_cpu_total = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["idle"] + tuple["wait"] + tuple["irq"] + \ - tuple["softirq"] + tuple["steal"] + tuple["guest"] - total_cpu += per_cpu_total - total_cpu_usage += per_cpu_usage - - # system.cpu.usage - kafka_dict['timestamp'] = int(round(time.time() * 1000)) - kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "perusage" - kafka_dict['device'] = items[0] - kafka_dict['value'] = str(round(per_cpu_usage * 100.0 / per_cpu_total, 2)) - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu) - print cup_stat_current - fd = open(data_dir, "w") - fd.write(cup_stat_current) - fd.close() - - pre_total_cpu_usage = 0 - pre_total_cpu = 0 - if cpu_stat_pre != None: - result = re.split("\s+", cpu_stat_pre.rstrip()) - pre_total_cpu_usage = int(result[0]) - pre_total_cpu = int(result[1]) - kafka_dict['timestamp'] = int(round(time.time() * 1000)) - kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "totalusage" - kafka_dict['device'] = "cpu" - kafka_dict['value'] = str(round((total_cpu_usage-pre_total_cpu_usage) * 100.0 / (total_cpu-pre_total_cpu), 2)) - kafka_json = json.dumps(kafka_dict) - - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getUptime(producer, kafka_dict, topic): - demension = ["uptime.day", "idletime.day"] - output = os.popen('cat /proc/uptime').readlines() - - for item in output: - items = re.split("\s+", item.rstrip()) - for i in range(len(demension)): - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["metric"] = DATA_TYPE + "." + 'uptime' + '.' + demension[i] - kafka_dict["value"] = str(round(float(items[i]) / 86400, 2)) - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getMemInfo(producer, kafka_dict, topic): - output = os.popen('cat /proc/meminfo').readlines() - mem_info = dict() - for item in output: - items = re.split(":?\s+", item.rstrip()) - # print items - mem_info[items[0]] = int(items[1]) - itemNum = len(items) - metric = 'memory' + '.' + items[0] - if (len(items) > 2 ): - metric = metric + '.' + items[2] - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["metric"] = METRIC_NAME_EXCLUDE.sub("", DATA_TYPE + "." + metric.lower()) - kafka_dict["value"] = items[1] - kafka_dict["device"] = 'memory' - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / mem_info[ - 'MemTotal'] - usage = round(usage, 2) - addExtraMetric(producer, kafka_dict, "memory.usage", usage, "memory", topic) - - -def getLoadAvg(producer, kafka_dict, topic): - demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min'] - output = os.popen('cat /proc/loadavg').readlines() - for item in output: - items = re.split("\s+", item.rstrip()) - - demens = min(len(demension), len(items)) - for i in range(demens): - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["metric"] = DATA_TYPE + "." + demension[i] - kafka_dict["value"] = items[i] - kafka_dict["device"] = 'cpu' - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getIpmiCPUTemp(producer, kafka_dict, topic): - output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines() - for item in output: - items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip()) - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["metric"] = DATA_TYPE + "." + 'cpu.temp' - kafka_dict["value"] = items[2] - kafka_dict["device"] = item[1] - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getInterface(producer, kafka_dict, topic): - demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets', - 'transmiterrs', 'transmitdrop'] - output = os.popen("cat /proc/net/dev").readlines() - - for item in output: - if re.match(r'^\s+eth\d+:', item) is None: - continue - items = re.split("[:\s]+", item.strip()) - filtered_items = items[1:5] + items[9:13] - - for i in range(len(demension)): - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict['metric'] = DATA_TYPE + "." + 'nic.' + demension[i] - kafka_dict["value"] = filtered_items[i] - kafka_dict["device"] = items[0] - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getSmartDisk(producer, kafka_dict, topic): - harddisks = os.popen("lsscsi").readlines() - for item in harddisks: - items = re.split('\/', item.strip()) - # print items - smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines() - for line in smartctl: - line = line.strip() - if re.match(r'^[\d]+\s', line) is None: - continue - lineitems = re.split("\s+", line) - metric = 'smartdisk.' + lineitems[1] - kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["value"] = lineitems[-1] - kafka_dict["device"] = 'smartdisk' - kafka_json = json.dumps(kafka_dict) - print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def getDiskStat(producer, kafka_dict, topic): - demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage'] - iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines() - # remove the first set of elements - iostat_output = iostat_output[len(iostat_output) / 2:] - # print iostat_output - iostat_dict = {} - for item in iostat_output: - items = re.split('\s+', item.strip()) - # print items - filtered_items = [items[5], items[6], items[9], items[11]] - iostat_dict[items[0]] = filtered_items - # print iostat_dict - - disk_output = os.popen("df -k | grep ^/dev").readlines() - for item in disk_output: - items = re.split('\s+', item.strip()) - fs = re.split('^\/dev\/(\w+)\d+$', items[0]) - disk = fs[1] - iostat_dict[disk].append(items[1]) - iostat_dict[disk].append(items[2]) - iostat_dict[disk].append(items[4].rstrip('%')) - #print iostat_dict - - for key, metrics in iostat_dict.iteritems(): - for i in range(len(metrics)): - metric = 'disk.' + demension[i] - kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() - kafka_dict["timestamp"] = int(round(time.time() * 1000)) - kafka_dict["value"] = metrics[i] - kafka_dict["device"] = key - kafka_json = json.dumps(kafka_dict) - # print kafka_json - kafka_produce(producer, topic, kafka_json) - - -def get_services(host): - service_list = list() - socket.setdefaulttimeout(1) - for (key, value) in PORT_MAP.items(): - try: - handle = None - port = int(key) - handle = socket.socket().connect((host, port)) - service_list.append(value) - except socket.error as err: - # if err.errno != errno.ECONNREFUSED: - # service_list.append(value) - pass - finally: - if handle != None: - handle.close() - - return service_list - -def tryGetSystemMetric(type, func, *args): - try: - func(*args) - except: - print type + " does not work, ignore" - -DEVICE_CONF = { - "cpustat": getCPU, - "uptime": getUptime, - "meminfo": getMemInfo, - "loadavg": getLoadAvg, - "ipmicputemp": getIpmiCPUTemp, - "network": getInterface, - "smartdisk": getSmartDisk, - "diskstat": getDiskStat -} - -def main(argv): - kafka = None - producer = None - topic = None - try: - # read the kafka.ini - config = load_config('config.json') - print config - - site = config[u'env'].get('site').encode('utf-8') - component = config[u'input'].get('component').encode('utf-8') - host = socket.getfqdn() - print host - - outputs = [s.encode('utf-8') for s in config[u'output']] - - if('kafka' in outputs): - kafkaConfig = config[u'output'].get(u'kafka') - brokerList = kafkaConfig.get('brokerList') - topic = kafkaConfig.get('topic') - kafka, producer = kafka_connect(brokerList) - - kafka_dict = {"host": host, "value": 0, "device": ''} - services = get_services(host) - print services - for service in services: - kafka_dict[service] = 'true' - - for type, func in DEVICE_CONF.items(): - print type + ":" + str(func) - tryGetSystemMetric(type, func, kafka, kafka_dict, topic) - - except Exception, e: - print 'main except: ', e - - finally: - kafka_close(kafka, producer) - return 0 - - -if __name__ == "__main__": - sys.exit(main(sys.argv))
