This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 27d3aba AMBARI-24727 - Autoscaling based on metric alerts (#2416) 27d3aba is described below commit 27d3abace07c8cdd209b918a8981a4e08618d7dd Author: kasakrisz <33458261+kasakr...@users.noreply.github.com> AuthorDate: Thu Oct 4 06:47:58 2018 +0200 AMBARI-24727 - Autoscaling based on metric alerts (#2416) --- .../python/ambari_commons/ambari_metrics_helper.py | 163 ++++++++++++++++++++- 1 file changed, 162 insertions(+), 1 deletion(-) diff --git a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py index 6444dfd..07e4831 100644 --- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py +++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py @@ -18,13 +18,33 @@ See the License for the specific language governing permissions and limitations under the License. ''' +import ambari_commons.network as network +import ambari_simplejson as json +import logging import os import random +import urllib +from ambari_agent.AmbariConfig import AmbariConfig from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.is_empty import is_empty + +logger = logging.getLogger(__name__) + DEFAULT_COLLECTOR_SUFFIX = '.sink.timeline.collector.hosts' DEFAULT_METRICS2_PROPERTIES_FILE_NAME = 'hadoop-metrics2.properties' +AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s" + +METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}' +METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_external_hosts}}' +METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_external_port}}' +AMS_METRICS_COLLECTOR_USE_SSL_KEY = '{{ams-site/timeline.metrics.service.http.policy}}' +CONNECTION_TIMEOUT_KEY = 'http.connection.timeout' +CONNECTION_TIMEOUT_DEFAULT = 5.0 + + def select_metric_collector_for_sink(sink_name): # TODO check '*' sink_name @@ -71,4 +91,145 @@ def load_properties_from_file(filepath, sep='=', comment_char='#'): key = key_value[0].strip() value = sep.join(key_value[1:]).strip('" \t') props[key] = value - return props \ No newline at end of file + return props + + +def get_ams_tokens(): + return (METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, AMS_METRICS_COLLECTOR_USE_SSL_KEY, METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY) + + +def create_ams_client(alert_id, ams_app_id, configurations, parameters): + if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations: + ams_collector_hosts = configurations[METRICS_COLLECTOR_VIP_HOST_KEY].split(',') + ams_collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY]) + else: + # ams-site/timeline.metrics.service.webapp.address is required + if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations: + raise Exception('{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)) + + collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":") + if not _valid_collector_webapp_address(collector_webapp_address): + raise Exception('{0} value should be set as "fqdn_hostname:port", but set to {1}'.format( + METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])) + + ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) + if not ams_collector_hosts: + raise Exception("Ambari metrics is not available: ams_collector_hosts is None") + ams_collector_port = int(collector_webapp_address[1]) + + use_ssl = False + if AMS_METRICS_COLLECTOR_USE_SSL_KEY in configurations: + use_ssl = configurations[AMS_METRICS_COLLECTOR_USE_SSL_KEY] == 'HTTPS_ONLY' + + connection_timeout = CONNECTION_TIMEOUT_DEFAULT + if CONNECTION_TIMEOUT_KEY in parameters: + connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) + return AmsClient(alert_id, ams_collector_hosts, ams_collector_port, use_ssl, connection_timeout, ams_app_id) + +def _valid_collector_webapp_address(webapp_address): + if len(webapp_address) == 2 \ + and webapp_address[0] != '127.0.0.1' \ + and webapp_address[1].isdigit(): + return True + + return False + +class AmsClient: + + def __init__(self, alert_id, ams_collector_hosts, ams_collector_port, use_ssl, connection_timeout, ams_app_id): + self.alert_id = alert_id + self.ams_collector_hosts = ams_collector_hosts + self.ams_collector_port = ams_collector_port + self.use_ssl = use_ssl + self.connection_timeout = connection_timeout + self.ams_app_id = ams_app_id + + def load_metric(self, ams_metric, host_filter): + metric_dict = None + http_code = None + for ams_collector_host in self.ams_collector_hosts: + try: + metric_dict, http_code = self._load_metric(ams_collector_host, ams_metric, host_filter) + if http_code == 200 and metric_dict: + break + except Exception, exception: + if logger.isEnabledFor(logging.DEBUG): + logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS ({1}:{2}): {3}".format(self.alert_id, ams_collector_host, self.ams_collector_port, str(exception))) + + if not http_code: + raise Exception("Ambari metrics is not available: no response") + if http_code not in [200, 299]: + raise Exception("Ambari metrics is not available: http status code = " + str(http_code)) + if not metric_dict: + raise Exception("Ambari metrics is not available: no metrics were found.") + + return metric_dict + + + def _load_metric(self, ams_collector_host, ams_metric, host_filter): + get_metrics_parameters = { + "metricNames": ams_metric, + "appId": self.ams_app_id, + "hostname": host_filter, + "precision": "seconds", + "grouped": "true", + } + encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) + url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters + + _ssl_version = AmbariConfig.get_resolved_config().get_force_https_protocol_value() + + ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf" + metric_truststore_ca_certs='ca.pem' + ca_certs = os.path.join(ams_monitor_conf_dir, metric_truststore_ca_certs) + + conn = None + response = None + data = None + try: + conn = network.get_http_connection( + ams_collector_host, + int(self.ams_collector_port), + self.use_ssl, + ca_certs, + ssl_version=_ssl_version + ) + conn.request("GET", url) + response = conn.getresponse() + data = response.read() + except Exception, exception: + if logger.isEnabledFor(logging.DEBUG): + logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS: {1}".format(self.alert_id, str(exception))) + status = response.status if response else None + return None, status + finally: + if logger.isEnabledFor(logging.DEBUG): + logger.debug(""" + AMS request parameters - {0} + AMS response - {1} + """.format(encoded_get_metrics_parameters, data)) + # explicitly close the connection as we've seen python hold onto these + if conn is not None: + try: + conn.close() + except: + logger.debug("[Alert][{0}] Unable to close URL connection to {1}".format(self.alert_id, url)) + + data_json = None + try: + data_json = json.loads(data) + except Exception, exception: + if logger.isEnabledFor(logging.DEBUG): + logger.exception("[Alert][{0}] Convert response to json failed or json doesn't contain needed data: {1}". + format(self.alert_id, str(exception))) + + if not data_json: + return None, response.status + + metric_dict = {} + if "metrics" not in data_json: + return None, response.status + for metrics_data in data_json["metrics"]: + metric_dict[metrics_data["metricname"]] = metrics_data["metrics"] + + return metric_dict, response.status