Repository: ambari Updated Branches: refs/heads/trunk 16c139808 -> 369109092
AMBARI-15766 Create a new alert type that is based on timeseries metrics (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/36910909 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/36910909 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/36910909 Branch: refs/heads/trunk Commit: 3691090923d141ea57a8d13931692d0e4b441f37 Parents: 16c1398 Author: Dmytro Sen <[email protected]> Authored: Mon Apr 11 20:45:29 2016 +0300 Committer: Dmytro Sen <[email protected]> Committed: Mon Apr 11 20:45:29 2016 +0300 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 6 +- .../python/ambari_agent/alerts/ams_alert.py | 237 ++++++++++++++++++ .../python/ambari_agent/alerts/metric_alert.py | 10 +- .../ambari_agent/TestAlertSchedulerHandler.py | 18 +- .../src/test/python/ambari_agent/TestAlerts.py | 89 +++++++ .../test/python/ambari_agent/TestAmsAlert.py | 243 +++++++++++++++++++ .../ambari_commons/aggregate_functions.py | 46 ++++ .../src/main/python/ambari_commons/firewall.py | 2 - .../python/ambari_commons/urllib_handlers.py | 4 +- .../state/alert/AlertDefinitionFactory.java | 4 + .../ambari/server/state/alert/AmsSource.java | 179 ++++++++++++++ .../ambari/server/state/alert/MetricSource.java | 4 - .../ambari/server/state/alert/SourceType.java | 5 + .../common-services/HDFS/2.1.0.2.0/alerts.json | 93 ++++++- .../package/alerts/alert_metrics_deviation.py | 20 +- .../server/api/services/AmbariMetaInfoTest.java | 2 +- 16 files changed, 926 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index b84832d..65cc8b0 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -30,6 +30,7 @@ import time from apscheduler.scheduler import Scheduler from alerts.collector import AlertCollector from alerts.metric_alert import MetricAlert +from alerts.ams_alert import AmsAlert from alerts.port_alert import PortAlert from alerts.script_alert import ScriptAlert from alerts.web_alert import WebAlert @@ -42,6 +43,7 @@ class AlertSchedulerHandler(): FILENAME = 'definitions.json' TYPE_PORT = 'PORT' TYPE_METRIC = 'METRIC' + TYPE_AMS = 'AMS' TYPE_SCRIPT = 'SCRIPT' TYPE_WEB = 'WEB' TYPE_RECOVERY = 'RECOVERY' @@ -299,6 +301,8 @@ class AlertSchedulerHandler(): if source_type == AlertSchedulerHandler.TYPE_METRIC: alert = MetricAlert(json_definition, source, self.config) + elif source_type == AlertSchedulerHandler.TYPE_AMS: + alert = AmsAlert(json_definition, source, self.config) elif source_type == AlertSchedulerHandler.TYPE_PORT: alert = PortAlert(json_definition, source, self.config) elif source_type == AlertSchedulerHandler.TYPE_SCRIPT: @@ -314,7 +318,7 @@ class AlertSchedulerHandler(): if alert is not None: alert.set_cluster(clusterName, hostName) - except Exception,exception: + except Exception, exception: logger.exception("[AlertScheduler] Unable to load an invalid alert definition. It will be skipped.") return alert http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/main/python/ambari_agent/alerts/ams_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/ams_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/ams_alert.py new file mode 100644 index 0000000..00ecc93 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/alerts/ams_alert.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import httplib + +import imp +import time +import urllib +from alerts.base_alert import BaseAlert +from alerts.metric_alert import MetricAlert +import ambari_simplejson as json +import logging +import re +import uuid + +from resource_management.libraries.functions.get_port_from_url import get_port_from_url + +logger = logging.getLogger() + +AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s" + +class AmsAlert(MetricAlert): + """ + Allow alerts to fire based on an AMS metrics. + Alert is triggered if the aggregated function of the specified metric has + grown beyond the specified threshold within a given time interval. + """ + def __init__(self, alert_meta, alert_source_meta, config): + super(AmsAlert, self).__init__(alert_meta, alert_source_meta, config) + + self.metric_info = None + if 'ams' in alert_source_meta: + self.metric_info = AmsMetric(alert_source_meta['ams']) + + def _collect(self): + """ + Low level function to collect alert data. The result is a tuple as: + res[0] = the result code + res[1] = the list of arguments supplied to the reporting text for the result code + """ + + if self.metric_info is None: + raise Exception("Could not determine result. Specific metric collector is not defined.") + + if self.uri_property_keys is None: + raise Exception("Could not determine result. URL(s) were not defined.") + + # use the URI lookup keys to get a final URI value to query + alert_uri = self._get_uri_from_structure(self.uri_property_keys) + + logger.debug("[Alert][{0}] Calculated metric URI to be {1} (ssl={2})".format( + self.get_name(), alert_uri.uri, str(alert_uri.is_ssl_enabled))) + + host = BaseAlert.get_host_from_url(alert_uri.uri) + if host is None: + host = self.host_name + + try: + port = int(get_port_from_url(alert_uri.uri)) + except: + port = 6188 + + collect_result = None + value_list = [] + + if isinstance(self.metric_info, AmsMetric): + raw_data_points, http_code = self._load_metric(alert_uri.is_ssl_enabled, host, port, self.metric_info) + if not raw_data_points and http_code in [200, 307]: + collect_result = self.RESULT_UNKNOWN + value_list.append('HTTP {0} response (metrics unavailable)'.format(str(http_code))) + elif not raw_data_points and http_code not in [200, 307]: + raise Exception("[Alert][{0}] Unable to extract JSON from HTTP response".format(self.get_name())) + else: + + data_points = self.metric_info.calculate_value(raw_data_points) + compute_result = self.metric_info.calculate_compute(data_points) + value_list.append(compute_result) + + collect_result = self._get_result(value_list[0] if compute_result is None else compute_result) + + logger.debug("[Alert][{0}] Computed result = {1}".format(self.get_name(), str(value_list))) + + return (collect_result, value_list) + + def _load_metric(self, ssl, host, port, ams_metric): + """ creates a AmsMetric object that holds info about ams-based metrics """ + + if "0.0.0.0" in str(host): + host = self.host_name + + + current_time = int(time.time()) * 1000 + interval = ams_metric.interval + get_metrics_parameters = { + "metricNames": ",".join(ams_metric.metric_list), + "appId": ams_metric.app_id, + "hostname": self.host_name, + "startTime": current_time - 60 * 1000 * interval, + "endTime": current_time, + "precision": "seconds", + "grouped": "true", + } + encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) + + url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters + + try: + # TODO Implement HTTPS support + conn = httplib.HTTPConnection(host, port, + timeout=self.connection_timeout) + conn.request("GET", url) + response = conn.getresponse() + data = response.read() + except Exception, exception: + if logger.isEnabledFor(logging.DEBUG): + logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS: {1}".format(self.get_name(), str(exception))) + finally: + if logger.isEnabledFor(logging.DEBUG): + logger.debug(""" + AMS request parameters - {0} + AMS response - {1} + """.format(encoded_get_metrics_parameters, data)) + # explicitely close the connection as we've seen python hold onto these + if conn is not None: + try: + conn.close() + except: + logger.debug("[Alert][{0}] Unable to close URL connection to {1}".format + (self.get_name(), url)) + json_is_valid = True + try: + data_json = json.loads(data) + except Exception, exception: + json_is_valid = False + if logger.isEnabledFor(logging.DEBUG): + logger.exception("[Alert][{0}] Convert response to json failed or json doesn't contain needed data: {1}". + format(self.get_name(), str(exception))) + + metrics = [] + + if json_is_valid: + metric_dict = {} + for metrics_data in data_json["metrics"]: + metric_dict[metrics_data["metricname"]] = metrics_data["metrics"] + + for metric_name in self.metric_info.metric_list: + if metric_name in metric_dict: + # TODO sorted data points by timestamp + # OrderedDict was implemented in Python2.7 + sorted_data_points = metric_dict[metric_name] + metrics.append(sorted_data_points) + pass + + return (metrics, response.status) + + +class AmsMetric: + DYNAMIC_CODE_VALUE_TEMPLATE = """ +# ensure that division yields a float, use // for integer division +from __future__ import division + +def f(args): + l = [] + for k in args[0]: + try: + data_point = {0} + l.append(data_point) + except: + continue + + return l +""" + + DYNAMIC_CODE_COMPUTE_TEMPLATE = """ +# ensure that division yields a float, use // for integer division +from __future__ import division +from ambari_commons.aggregate_functions import sample_standard_deviation_percentage +from ambari_commons.aggregate_functions import sample_standard_deviation +from ambari_commons.aggregate_functions import mean +from ambari_commons.aggregate_functions import count + +def f(args): + func = {0} + return func(args) +""" + + def __init__(self, metric_info): + self.custom_value_module = None + self.custom_compute_module = None + self.metric_list = metric_info['metric_list'] + self.interval = metric_info['interval'] # in minutes + self.app_id = metric_info['app_id'] + self.minimum_value = metric_info['minimum_value'] + + if 'value' in metric_info: + realcode = re.sub('(\{(\d+)\})', 'args[\g<2>][k]', metric_info['value']) + + self.custom_value_module = imp.new_module(str(uuid.uuid4())) + code = self.DYNAMIC_CODE_VALUE_TEMPLATE.format(realcode) + exec code in self.custom_value_module.__dict__ + + if 'compute' in metric_info: + realcode = metric_info['compute'] + self.custom_compute_module = imp.new_module(str(uuid.uuid4())) + code = self.DYNAMIC_CODE_COMPUTE_TEMPLATE.format(realcode) + exec code in self.custom_compute_module.__dict__ + + + def calculate_value(self, args): + data_points = None + if self.custom_value_module is not None: + data_points = self.custom_value_module.f(args) + if self.minimum_value: + data_points = [data_point for data_point in data_points if data_point > self.minimum_value] + return data_points + + def calculate_compute(self, args): + compute_result = None + if self.custom_compute_module is not None: + compute_result = self.custom_compute_module.f(args) + return compute_result http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py index d177bd4..803bdc6 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py @@ -105,14 +105,14 @@ class MetricAlert(BaseAlert): check_value = self.metric_info.calculate(value_list) value_list.append(check_value) - collect_result = self.__get_result(value_list[0] if check_value is None else check_value) + collect_result = self._get_result(value_list[0] if check_value is None else check_value) logger.debug("[Alert][{0}] Resolved values = {1}".format(self.get_name(), str(value_list))) return (collect_result, value_list) - def __get_result(self, value): + def _get_result(self, value): ok_value = self.__find_threshold('ok') warn_value = self.__find_threshold('warning') crit_value = self.__find_threshold('critical') @@ -149,8 +149,6 @@ class MetricAlert(BaseAlert): else: return self.RESULT_OK - return None - def __find_threshold(self, reporting_type): """ find the defined thresholds for alert values """ @@ -166,7 +164,7 @@ class MetricAlert(BaseAlert): return self.alert_source_meta['reporting'][reporting_type]['value'] - + def _load_jmx(self, ssl, host, port, jmx_metric): """ creates a JmxMetric object that holds info about jmx-based metrics """ value_list = [] @@ -286,7 +284,7 @@ def f(args): self.custom_module = None self.property_list = jmx_info['property_list'] self.property_map = {} - + if 'value' in jmx_info: realcode = re.sub('(\{(\d+)\})', 'args[\g<2>]', jmx_info['value']) http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py index f4e7ba1..1202c81 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py @@ -23,12 +23,13 @@ import os from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler from ambari_agent.alerts.metric_alert import MetricAlert +from ambari_agent.alerts.ams_alert import AmsAlert from ambari_agent.alerts.port_alert import PortAlert from ambari_agent.alerts.web_alert import WebAlert from AmbariConfig import AmbariConfig -from mock.mock import patch, Mock, MagicMock +from mock.mock import Mock, MagicMock from unittest import TestCase TEST_PATH = os.path.join('ambari_agent', 'dummy_files') @@ -60,6 +61,21 @@ class TestAlertSchedulerHandler(TestCase): self.assertEquals(callable_result.alert_meta, json_definition) self.assertEquals(callable_result.alert_source_meta, json_definition['source']) + def test_json_to_callable_ams(self): + scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) + json_definition = { + 'source': { + 'type': 'AMS' + } + } + + callable_result = scheduler._AlertSchedulerHandler__json_to_callable('cluster', 'host', copy.deepcopy(json_definition)) + + self.assertTrue(callable_result is not None) + self.assertTrue(isinstance(callable_result, AmsAlert)) + self.assertEquals(callable_result.alert_meta, json_definition) + self.assertEquals(callable_result.alert_source_meta, json_definition['source']) + def test_json_to_callable_port(self): json_definition = { 'source': { http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/test/python/ambari_agent/TestAlerts.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py index 9caee8a..c13b472 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py @@ -23,6 +23,7 @@ import socket import sys import urllib2 import tempfile +from alerts.ams_alert import AmsAlert from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler from ambari_agent.RecoveryManager import RecoveryManager @@ -405,6 +406,48 @@ class TestAlerts(TestCase): self.assertEquals('(Unit Tests) OK: 1 25 None', alerts[0]['text']) + @patch.object(AmsAlert, "_load_metric") + def test_ams_alert(self, ma_load_metric_mock): + definition_json = self._get_ams_alert_definition() + configuration = {'ams-site': + {'timeline.metrics.service.webapp.address': 'c6401.ambari.apache.org:6188'} + } + + collector = AlertCollector() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = AmsAlert(definition_json, definition_json['source'], self.config) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + + # trip an OK + ma_load_metric_mock.return_value = ([{1:100,2:100,3:200,4:200}], None) + + alert.collect() + alerts = collector.alerts() + self.assertEquals(0, len(collector.alerts())) + self.assertEquals('OK', alerts[0]['state']) + self.assertEquals('(Unit Tests) OK: the mean used heap size is 150 MB.', alerts[0]['text']) + + # trip a warning + ma_load_metric_mock.return_value = ([{1:800,2:800,3:900,4:900}], None) + + alert.collect() + alerts = collector.alerts() + self.assertEquals(0, len(collector.alerts())) + self.assertEquals('WARNING', alerts[0]['state']) + self.assertEquals('(Unit Tests) Warning: the mean used heap size is 850 MB.', alerts[0]['text']) + + # trip a critical now + ma_load_metric_mock.return_value = ([{1:1000,2:1000,3:2000,4:2000}], None) + + alert.collect() + alerts = collector.alerts() + self.assertEquals(0, len(collector.alerts())) + self.assertEquals('CRITICAL', alerts[0]['state']) + self.assertEquals('(Unit Tests) Critical: the mean used heap size is 1500 MB.', alerts[0]['text']) + @patch.object(MetricAlert, "_load_jmx") def test_alert_uri_structure(self, ma_load_jmx_mock): definition_json = self._get_metric_alert_definition() @@ -1392,6 +1435,52 @@ class TestAlerts(TestCase): } } + def _get_ams_alert_definition(self): + return { + "ignore_host": False, + "name": "namenode_mean_heapsize_used", + "componentName": "NAMENODE", + "interval": 1, + "clusterId": 2, + "uuid": "8a857295-ad11-4985-896e-d866dc27b963", + "label": "NameNode Mean Used Heap Size (Hourly)", + "definitionId": 28, + "source": { + "ams": { + "compute": "mean", + "interval": 30, + "app_id": "NAMENODE", + "value": "{0}", + "metric_list": [ + "jvm.JvmMetrics.MemHeapUsedM" + ], + "minimum_value": -1 + }, + "reporting": { + "units": "#", + "warning": { + "text": "(Unit Tests) Warning: the mean used heap size is {0} MB.", + "value": 768 + }, + "ok": { + "text": "(Unit Tests) OK: the mean used heap size is {0} MB." + }, + "critical": { + "text": "(Unit Tests) Critical: the mean used heap size is {0} MB.", + "value": 1024 + } + }, + "type": "AMS", + "uri": { + "http": "{{ams-site/timeline.metrics.service.webapp.address}}", + "https_property_value": "HTTPS_ONLY", + "https_property": "{{ams-site/timeline.metrics.service.http.policy}}", + "https": "{{ams-site/timeline.metrics.service.webapp.address}}", + "connection_timeout": 5.0 + } + }, + } + def _get_metric_alert_definition_with_float_division(self): return { "name": "DataNode CPU Check", http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-agent/src/test/python/ambari_agent/TestAmsAlert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAmsAlert.py b/ambari-agent/src/test/python/ambari_agent/TestAmsAlert.py new file mode 100644 index 0000000..f1506f0 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/TestAmsAlert.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from alerts.ams_alert import AmsAlert +from mock.mock import Mock, MagicMock, patch +from AmbariConfig import AmbariConfig + +class TestAmsAlert(TestCase): + + def setUp(self): + self.config = AmbariConfig() + + @patch("httplib.HTTPConnection") + def test_collect_ok(self, conn_mock): + alert_meta = { + 'name': 'alert1', + 'label': 'label1', + 'serviceName': 'service1', + 'componentName': 'component1', + 'uuid': '123', + 'enabled': 'true' + } + alert_source_meta = { + 'ams': { + 'metric_list': [ + 'metric1' + ], + "app_id": "APP_ID", + "interval": 60, + "minimum_value": -1, + "compute": "mean", + "value": "{0}" + }, + 'uri': { + 'http': '192.168.0.10:8080', + 'https_property': '{{ams-site/timeline.metrics.service.http.policy}}', + 'https_property_value': 'HTTPS_ONLY' + }, + "reporting": { + "ok": { + "text": "OK: {0}" + }, + "warning": { + "text": "Warn: {0}", + "value": 3 + }, + "critical": { + "text": "Crit: {0}", + "value": 5 + } + } + } + cluster = 'c1' + host = 'host1' + expected_text = 'OK: 2' + + def collector_side_effect(clus, data): + self.assertEquals(data['name'], alert_meta['name']) + self.assertEquals(data['label'], alert_meta['label']) + self.assertEquals(data['text'], expected_text) + self.assertEquals(data['service'], alert_meta['serviceName']) + self.assertEquals(data['component'], alert_meta['componentName']) + self.assertEquals(data['uuid'], alert_meta['uuid']) + self.assertEquals(data['enabled'], alert_meta['enabled']) + self.assertEquals(data['cluster'], cluster) + self.assertEquals(clus, cluster) + + ca_connection = MagicMock() + response = MagicMock() + response.status = 200 + ca_connection.getresponse.return_value = response + conn_mock.return_value = ca_connection + response.read.return_value = '{"metrics":[{"metricname":"metric1","metrics":{"1459966360838":1,"1459966370838":3}}]}' + + mock_collector = MagicMock() + mock_collector.put = Mock(side_effect=collector_side_effect) + + alert = AmsAlert(alert_meta, alert_source_meta, self.config) + alert.set_helpers(mock_collector, {'foo-site/bar': 12, 'foo-site/baz': 'asd'}) + alert.set_cluster(cluster, host) + + alert.collect() + + @patch("httplib.HTTPConnection") + def test_collect_warn(self, conn_mock): + alert_meta = { + 'name': 'alert1', + 'label': 'label1', + 'serviceName': 'service1', + 'componentName': 'component1', + 'uuid': '123', + 'enabled': 'true' + } + alert_source_meta = { + 'ams': { + 'metric_list': [ + 'metric1' + ], + "app_id": "APP_ID", + "interval": 60, + "minimum_value": -1, + "compute": "mean", + "value": "{0}" + }, + 'uri': { + 'http': '192.168.0.10:8080', + 'https_property': '{{ams-site/timeline.metrics.service.http.policy}}', + 'https_property_value': 'HTTPS_ONLY' + }, + "reporting": { + "ok": { + "text": "OK: {0}" + }, + "warning": { + "text": "Warn: {0}", + "value": 3 + }, + "critical": { + "text": "Crit: {0}", + "value": 5 + } + } + } + cluster = 'c1' + host = 'host1' + expected_text = 'Warn: 4' + + def collector_side_effect(clus, data): + self.assertEquals(data['name'], alert_meta['name']) + self.assertEquals(data['label'], alert_meta['label']) + self.assertEquals(data['text'], expected_text) + self.assertEquals(data['service'], alert_meta['serviceName']) + self.assertEquals(data['component'], alert_meta['componentName']) + self.assertEquals(data['uuid'], alert_meta['uuid']) + self.assertEquals(data['enabled'], alert_meta['enabled']) + self.assertEquals(data['cluster'], cluster) + self.assertEquals(clus, cluster) + + ca_connection = MagicMock() + response = MagicMock() + response.status = 200 + ca_connection.getresponse.return_value = response + conn_mock.return_value = ca_connection + response.read.return_value = '{"metrics":[{"metricname":"metric1","metrics":{"1459966360838":3,"1459966370838":5}}]}' + + mock_collector = MagicMock() + mock_collector.put = Mock(side_effect=collector_side_effect) + + alert = AmsAlert(alert_meta, alert_source_meta, self.config) + alert.set_helpers(mock_collector, {'foo-site/bar': 12, 'foo-site/baz': 'asd'}) + alert.set_cluster(cluster, host) + + alert.collect() + + @patch("httplib.HTTPConnection") + def test_collect_ok(self, conn_mock): + alert_meta = { + 'name': 'alert1', + 'label': 'label1', + 'serviceName': 'service1', + 'componentName': 'component1', + 'uuid': '123', + 'enabled': 'true' + } + alert_source_meta = { + 'ams': { + 'metric_list': [ + 'metric1' + ], + "app_id": "APP_ID", + "interval": 60, + "minimum_value": -1, + "compute": "mean", + "value": "{0}" + }, + 'uri': { + 'http': '192.168.0.10:8080', + 'https_property': '{{ams-site/timeline.metrics.service.http.policy}}', + 'https_property_value': 'HTTPS_ONLY' + }, + "reporting": { + "ok": { + "text": "OK: {0}" + }, + "warning": { + "text": "Warn: {0}", + "value": 3 + }, + "critical": { + "text": "Crit: {0}", + "value": 5 + } + } + } + cluster = 'c1' + host = 'host1' + expected_text = 'Crit: 10' + + def collector_side_effect(clus, data): + self.assertEquals(data['name'], alert_meta['name']) + self.assertEquals(data['label'], alert_meta['label']) + self.assertEquals(data['text'], expected_text) + self.assertEquals(data['service'], alert_meta['serviceName']) + self.assertEquals(data['component'], alert_meta['componentName']) + self.assertEquals(data['uuid'], alert_meta['uuid']) + self.assertEquals(data['enabled'], alert_meta['enabled']) + self.assertEquals(data['cluster'], cluster) + self.assertEquals(clus, cluster) + + ca_connection = MagicMock() + response = MagicMock() + response.status = 200 + ca_connection.getresponse.return_value = response + conn_mock.return_value = ca_connection + response.read.return_value = '{"metrics":[{"metricname":"metric1","metrics":{"1459966360838":10,"1459966370838":10}}]}' + + mock_collector = MagicMock() + mock_collector.put = Mock(side_effect=collector_side_effect) + + alert = AmsAlert(alert_meta, alert_source_meta, self.config) + alert.set_helpers(mock_collector, {'foo-site/bar': 12, 'foo-site/baz': 'asd'}) + alert.set_cluster(cluster, host) + + alert.collect() + http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-common/src/main/python/ambari_commons/aggregate_functions.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/aggregate_functions.py b/ambari-common/src/main/python/ambari_commons/aggregate_functions.py new file mode 100644 index 0000000..a0d8cee --- /dev/null +++ b/ambari-common/src/main/python/ambari_commons/aggregate_functions.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from math import sqrt + +def sample_standard_deviation(lst): + """calculates standard deviation""" + if len(lst) < 2: + return 0 + variance = sum([(element-mean(lst))**2 for element in lst]) / (len(lst) - 1) + return sqrt(variance) + +def mean(lst): + """calculates mean""" + if len(lst) < 1: + return 0 + return sum(lst) / len(lst) + +def sample_standard_deviation_percentage(lst): + """calculates sample standard deviation percentage""" + try: + return sample_standard_deviation(lst) / mean(lst) * 100 + except ZeroDivisionError: + # should not be a case for this alert + return 0 + +def count(lst): + """calculates number of data points""" + return len(lst) http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-common/src/main/python/ambari_commons/firewall.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/firewall.py b/ambari-common/src/main/python/ambari_commons/firewall.py index 64d396b..e077799 100644 --- a/ambari-common/src/main/python/ambari_commons/firewall.py +++ b/ambari-common/src/main/python/ambari_commons/firewall.py @@ -18,8 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import subprocess -import shlex from ambari_commons import OSCheck, OSConst from ambari_commons.logging_utils import print_warning_msg from ambari_commons.os_family_impl import OsFamilyImpl http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-common/src/main/python/ambari_commons/urllib_handlers.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/urllib_handlers.py b/ambari-common/src/main/python/ambari_commons/urllib_handlers.py index aa2e3f6..0406971 100644 --- a/ambari-common/src/main/python/ambari_commons/urllib_handlers.py +++ b/ambari-common/src/main/python/ambari_commons/urllib_handlers.py @@ -48,7 +48,7 @@ class RefreshHeaderProcessor(BaseHandler): def http_response(self, request, response): """ - Inspect the http response from urllib2 and see if thers is a refresh + Inspect the http response from urllib2 and see if there is a refresh response header. If there is, then attempt to follow it and re-execute the query using the new host. :param request: @@ -114,4 +114,4 @@ class RefreshHeaderProcessor(BaseHandler): refresh_header, str(exception))) # return the original response - return response \ No newline at end of file + return response http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java index 1676b53..acbb881 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java @@ -310,6 +310,10 @@ public class AlertDefinitionFactory { clazz = MetricSource.class; break; } + case AMS:{ + clazz = AmsSource.class; + break; + } case PORT:{ clazz = PortSource.class; break; http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AmsSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AmsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AmsSource.java new file mode 100644 index 0000000..eda6a5a --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AmsSource.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.alert; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +/** + * Alert when the source type is defined as {@link org.apache.ambari.server.state.alert.SourceType#METRIC} + * <p/> + * Equality checking for instances of this class should be executed on every + * member to ensure that reconciling stack differences is correct. + */ +public class AmsSource extends Source { + + @SerializedName("uri") + private AlertUri uri = null; + + @SerializedName("ams") + private AmsInfo amsInfo = null; + + /** + * @return the ams info, if this metric is ams-based + */ + public AmsInfo getAmsInfo() { + return amsInfo; + } + + /** + * @return the uri info, which may include port information + */ + public AlertUri getUri() { + return uri; + } + + /** + * + */ + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((uri == null) ? 0 : uri.hashCode()); + result = prime * result + ((amsInfo == null) ? 0 : amsInfo.hashCode()); + + return result; + } + + /** + * + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!super.equals(obj)) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + AmsSource other = (AmsSource) obj; + + if (uri == null) { + if (other.uri != null) { + return false; + } + } else if (!uri.equals(other.uri)) { + return false; + } + + if (amsInfo == null) { + if (other.amsInfo != null) { + return false; + } + } else if (!amsInfo.equals(other.amsInfo)) { + return false; + } + + return true; + } + + /** + * Represents the {@code ams} element in a Metric alert. + */ + public static class AmsInfo { + + @SerializedName("metric_list") + private List<String> metricList; + + private String value; + + private int interval = 60; + + private String compute; + + @SerializedName("app_id") + private String appId; + + @SerializedName("minimum_value") + private int minimumValue; + + public String getAppId() { + return appId; + } + + public int getInterval() { + return interval; + } + + public String getCompute() { + return compute; + } + + public List<String> getMetricList() { + return metricList; + } + + public String getValue() { + return value; + } + + public int getMinimumValue() { + return minimumValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + AmsInfo amsInfo = (AmsInfo) o; + + if (interval != amsInfo.interval) return false; + if (minimumValue != amsInfo.minimumValue) return false; + if (appId != null ? !appId.equals(amsInfo.appId) : amsInfo.appId != null) + return false; + if (compute != null ? !compute.equals(amsInfo.compute) : amsInfo.compute != null) + return false; + if (metricList != null ? !metricList.equals(amsInfo.metricList) : amsInfo.metricList != null) + return false; + if (value != null ? !value.equals(amsInfo.value) : amsInfo.value != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = metricList != null ? metricList.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + result = 31 * result + interval; + result = 31 * result + (compute != null ? compute.hashCode() : 0); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + minimumValue; + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java index 2ff438d..32bceca 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java @@ -148,10 +148,6 @@ public class MetricSource extends Source { List<String> list1 = new ArrayList<String>(propertyList); List<String> list2 = new ArrayList<String>(other.propertyList); - if ((null == list1 && null != list2) || (null != list1 && null == list2)) { - return false; - } - // !!! even if out of order, this is enough to fail return list1.equals(list2); http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java index 357baf9..ce971d2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/SourceType.java @@ -22,6 +22,11 @@ package org.apache.ambari.server.state.alert; */ public enum SourceType { /** + * Source is from ams metric data. + */ + AMS, + + /** * Source is from metric data. */ METRIC, http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json index 3612de2..f5fcc76 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json @@ -435,9 +435,9 @@ "connection_timeout": 5.0, "high_availability": { "nameservice": "{{hdfs-site/dfs.nameservices}}", - "alias_key" : "{{hdfs-site/dfs.ha.namenodes.{{ha-nameservice}}}}", - "http_pattern" : "{{hdfs-site/dfs.namenode.http-address.{{ha-nameservice}}.{{alias}}}}", - "https_pattern" : "{{hdfs-site/dfs.namenode.https-address.{{ha-nameservice}}.{{alias}}}}" + "alias_key": "{{hdfs-site/dfs.ha.namenodes.{{ha-nameservice}}}}", + "http_pattern": "{{hdfs-site/dfs.namenode.http-address.{{ha-nameservice}}.{{alias}}}}", + "https_pattern": "{{hdfs-site/dfs.namenode.https-address.{{ha-nameservice}}.{{alias}}}}" } }, "reporting": { @@ -452,7 +452,7 @@ "text": "DataNode Health: [Live={2}, Stale={1}, Dead={0}]", "value": 1 }, - "units" : "DNs" + "units": "DNs" }, "jmx": { "property_list": [ @@ -465,6 +465,91 @@ } }, { + "name": "namenode_free_heap_size_deviation_percentage", + "label": "NameNode Heap Usage (Hourly)", + "description": "This service-level alert is triggered if the NN heap usage deviation has grown beyond the specified threshold within a given time interval.", + "interval": 1, + "scope": "SERVICE", + "enabled": false, + "source": { + "type": "AMS", + "uri": { + "http": "{{ams-site/timeline.metrics.service.webapp.address}}", + "https": "{{ams-site/timeline.metrics.service.webapp.address}}", + "https_property": "{{ams-site/timeline.metrics.service.http.policy}}", + "https_property_value": "HTTPS_ONLY", + "connection_timeout": 5.0 + }, + "reporting": { + "ok": { + "text": "The sample standard deviation percentage is {0}%" + }, + "warning": { + "text": "The sample standard deviation percentage is {0}%", + "value": 20 + }, + "critical": { + "text": "The sample standard deviation percentage is {0}%", + "value": 50 + }, + "units" : "%" + }, + "ams": { + "app_id": "NAMENODE", + "interval": 60, + "metric_list": [ + "jvm.JvmMetrics.MemHeapUsedM", + "jvm.JvmMetrics.MemHeapMaxM" + ], + "minimum_value": 1, + "value": "{1} - {0}", + "compute": "sample_standard_deviation_percentage" + } + } + }, + { + "name": "namenode_mean_heapsize_used", + "label": "NameNode Mean Used Heap Size (Hourly)", + "description": "This service-level alert is triggered if the mean NN heap size is beyond the specified threshold within a given time interval.", + "interval": 1, + "scope": "SERVICE", + "enabled": false, + "source": { + "type": "AMS", + "uri": { + "http": "{{ams-site/timeline.metrics.service.webapp.address}}", + "https": "{{ams-site/timeline.metrics.service.webapp.address}}", + "https_property": "{{ams-site/timeline.metrics.service.http.policy}}", + "https_property_value": "HTTPS_ONLY", + "connection_timeout": 5.0 + }, + "reporting": { + "ok": { + "text": "OK: the mean used heap size is {0} MB." + }, + "warning": { + "text": "Warning: the mean used heap size is {0} MB.", + "value": 768 + }, + "critical": { + "text": "Critical: the mean used heap size is {0} MB.", + "value": 1024 + }, + "units" : "MB" + }, + "ams": { + "app_id": "NAMENODE", + "interval": 30, + "metric_list": [ + "jvm.JvmMetrics.MemHeapUsedM" + ], + "minimum_value": -1, + "value": "{0}", + "compute": "mean" + } + } + }, + { "name": "namenode_last_checkpoint", "label": "NameNode Last Checkpoint", "description": "This service-level alert will trigger if the last time that the NameNode performed a checkpoint was too long ago. It will also trigger if the number of uncommitted transactions is beyond a certain threshold.", http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py index 85d9d7e..95be7ba 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/alerts/alert_metrics_deviation.py @@ -21,11 +21,11 @@ import httplib import json import logging -from math import sqrt import urllib import time import urllib2 from resource_management import Environment +from ambari_commons.aggregate_functions import sample_standard_deviation, mean from resource_management.libraries.functions.curl_krb_request import curl_krb_request from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS @@ -318,12 +318,12 @@ def execute(configurations={}, parameters={}, host_name=None): if len(metrics) < 2: return (RESULT_STATE_OK, ['No datapoints found above the minimum threshold of {0} seconds'.format(minimum_value_threshold)]) - mean = calculate_mean(metrics) - stddev = calulate_sample_std_deviation(metrics) + mean_value = mean(metrics) + stddev = sample_standard_deviation(metrics) max_value = max(metrics) / 1000 try: - deviation_percent = stddev / mean * 100 + deviation_percent = stddev / mean_value * 100 except ZeroDivisionError: # should not be a case for this alert return (RESULT_STATE_SKIPPED, ["Unable to calculate the standard deviation percentage. The mean value is 0"]) @@ -334,7 +334,7 @@ def execute(configurations={}, parameters={}, host_name=None): Mean - {2} Standard deviation - {3} Percentage standard deviation - {4} - """.format(encoded_get_metrics_parameters, data_json, mean, stddev, deviation_percent)) + """.format(encoded_get_metrics_parameters, data_json, mean_value, stddev, deviation_percent)) if deviation_percent > critical_threshold: return (RESULT_STATE_CRITICAL,['CRITICAL. Percentage standard deviation value {0}% is beyond the critical threshold of {1}% (growing {2} seconds to {3} seconds)'.format("%.2f" % deviation_percent, "%.2f" % critical_threshold, minimum_value_threshold, "%.2f" % max_value)]) @@ -342,16 +342,6 @@ def execute(configurations={}, parameters={}, host_name=None): return (RESULT_STATE_WARNING,['WARNING. Percentage standard deviation value {0}% is beyond the warning threshold of {1}% (growing {2} seconds to {3} seconds)'.format("%.2f" % deviation_percent, "%.2f" % warning_threshold, minimum_value_threshold, "%.2f" % max_value)]) return (RESULT_STATE_OK,['OK. Percentage standard deviation value is {0}%'.format("%.2f" % deviation_percent)]) -def calulate_sample_std_deviation(lst): - """calculates standard deviation""" - mean = calculate_mean(lst) - variance = sum([(element-mean)**2 for element in lst]) / (len(lst) - 1) - return sqrt(variance) - -def calculate_mean(lst): - """calculates mean""" - return sum(lst) / len(lst) - def valid_collector_webapp_address(webapp_address): if len(webapp_address) == 2 \ and webapp_address[0] != '127.0.0.1' \ http://git-wip-us.apache.org/repos/asf/ambari/blob/36910909/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java index 5dc7f3b..8c844eb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java @@ -1818,7 +1818,7 @@ public class AmbariMetaInfoTest { assertNotNull( metricSource.getUri().getHttpsUri() ); assertNotNull( metricSource.getUri().getHttpUri() ); assertEquals(12345, metricSource.getUri().getDefaultPort()); - +// // ignore host assertTrue(ignoreHost.isHostIgnored()); }
