This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27d3aba AMBARI-24727 - Autoscaling based on metric alerts (#2416)
27d3aba is described below
commit 27d3abace07c8cdd209b918a8981a4e08618d7dd
Author: kasakrisz <[email protected]>
AuthorDate: Thu Oct 4 06:47:58 2018 +0200
AMBARI-24727 - Autoscaling based on metric alerts (#2416)
---
.../python/ambari_commons/ambari_metrics_helper.py | 163 ++++++++++++++++++++-
1 file changed, 162 insertions(+), 1 deletion(-)
diff --git
a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
index 6444dfd..07e4831 100644
--- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
+++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
@@ -18,13 +18,33 @@ See the License for the specific language governing
permissions and
limitations under the License.
'''
+import ambari_commons.network as network
+import ambari_simplejson as json
+import logging
import os
import random
+import urllib
+from ambari_agent.AmbariConfig import AmbariConfig
from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.is_empty import is_empty
+
+logger = logging.getLogger(__name__)
+
DEFAULT_COLLECTOR_SUFFIX = '.sink.timeline.collector.hosts'
DEFAULT_METRICS2_PROPERTIES_FILE_NAME = 'hadoop-metrics2.properties'
+AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
+
+METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY =
'{{ams-site/timeline.metrics.service.webapp.address}}'
+METRICS_COLLECTOR_VIP_HOST_KEY =
'{{cluster-env/metrics_collector_external_hosts}}'
+METRICS_COLLECTOR_VIP_PORT_KEY =
'{{cluster-env/metrics_collector_external_port}}'
+AMS_METRICS_COLLECTOR_USE_SSL_KEY =
'{{ams-site/timeline.metrics.service.http.policy}}'
+CONNECTION_TIMEOUT_KEY = 'http.connection.timeout'
+CONNECTION_TIMEOUT_DEFAULT = 5.0
+
+
def select_metric_collector_for_sink(sink_name):
# TODO check '*' sink_name
@@ -71,4 +91,145 @@ def load_properties_from_file(filepath, sep='=',
comment_char='#'):
key = key_value[0].strip()
value = sep.join(key_value[1:]).strip('" \t')
props[key] = value
- return props
\ No newline at end of file
+ return props
+
+
+def get_ams_tokens():
+ return (METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY,
AMS_METRICS_COLLECTOR_USE_SSL_KEY, METRICS_COLLECTOR_VIP_HOST_KEY,
METRICS_COLLECTOR_VIP_PORT_KEY)
+
+
+def create_ams_client(alert_id, ams_app_id, configurations, parameters):
+ if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and
METRICS_COLLECTOR_VIP_PORT_KEY in configurations:
+ ams_collector_hosts =
configurations[METRICS_COLLECTOR_VIP_HOST_KEY].split(',')
+ ams_collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
+ else:
+ # ams-site/timeline.metrics.service.webapp.address is required
+ if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
+ raise Exception('{0} is a required parameter for the
script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY))
+
+ collector_webapp_address =
configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":")
+ if not _valid_collector_webapp_address(collector_webapp_address):
+ raise Exception('{0} value should be set as "fqdn_hostname:port", but
set to {1}'.format(
+ METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY,
configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY]))
+
+ ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts",
[])
+ if not ams_collector_hosts:
+ raise Exception("Ambari metrics is not available: ams_collector_hosts is
None")
+ ams_collector_port = int(collector_webapp_address[1])
+
+ use_ssl = False
+ if AMS_METRICS_COLLECTOR_USE_SSL_KEY in configurations:
+ use_ssl = configurations[AMS_METRICS_COLLECTOR_USE_SSL_KEY] == 'HTTPS_ONLY'
+
+ connection_timeout = CONNECTION_TIMEOUT_DEFAULT
+ if CONNECTION_TIMEOUT_KEY in parameters:
+ connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY])
+ return AmsClient(alert_id, ams_collector_hosts, ams_collector_port, use_ssl,
connection_timeout, ams_app_id)
+
+def _valid_collector_webapp_address(webapp_address):
+ if len(webapp_address) == 2 \
+ and webapp_address[0] != '127.0.0.1' \
+ and webapp_address[1].isdigit():
+ return True
+
+ return False
+
+class AmsClient:
+
+ def __init__(self, alert_id, ams_collector_hosts, ams_collector_port,
use_ssl, connection_timeout, ams_app_id):
+ self.alert_id = alert_id
+ self.ams_collector_hosts = ams_collector_hosts
+ self.ams_collector_port = ams_collector_port
+ self.use_ssl = use_ssl
+ self.connection_timeout = connection_timeout
+ self.ams_app_id = ams_app_id
+
+ def load_metric(self, ams_metric, host_filter):
+ metric_dict = None
+ http_code = None
+ for ams_collector_host in self.ams_collector_hosts:
+ try:
+ metric_dict, http_code = self._load_metric(ams_collector_host,
ams_metric, host_filter)
+ if http_code == 200 and metric_dict:
+ break
+ except Exception, exception:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS
({1}:{2}): {3}".format(self.alert_id, ams_collector_host,
self.ams_collector_port, str(exception)))
+
+ if not http_code:
+ raise Exception("Ambari metrics is not available: no response")
+ if http_code not in [200, 299]:
+ raise Exception("Ambari metrics is not available: http status code = " +
str(http_code))
+ if not metric_dict:
+ raise Exception("Ambari metrics is not available: no metrics were
found.")
+
+ return metric_dict
+
+
+ def _load_metric(self, ams_collector_host, ams_metric, host_filter):
+ get_metrics_parameters = {
+ "metricNames": ams_metric,
+ "appId": self.ams_app_id,
+ "hostname": host_filter,
+ "precision": "seconds",
+ "grouped": "true",
+ }
+ encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
+ url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters
+
+ _ssl_version =
AmbariConfig.get_resolved_config().get_force_https_protocol_value()
+
+ ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
+ metric_truststore_ca_certs='ca.pem'
+ ca_certs = os.path.join(ams_monitor_conf_dir, metric_truststore_ca_certs)
+
+ conn = None
+ response = None
+ data = None
+ try:
+ conn = network.get_http_connection(
+ ams_collector_host,
+ int(self.ams_collector_port),
+ self.use_ssl,
+ ca_certs,
+ ssl_version=_ssl_version
+ )
+ conn.request("GET", url)
+ response = conn.getresponse()
+ data = response.read()
+ except Exception, exception:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS:
{1}".format(self.alert_id, str(exception)))
+ status = response.status if response else None
+ return None, status
+ finally:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.debug("""
+ AMS request parameters - {0}
+ AMS response - {1}
+ """.format(encoded_get_metrics_parameters, data))
+ # explicitly close the connection as we've seen python hold onto these
+ if conn is not None:
+ try:
+ conn.close()
+ except:
+ logger.debug("[Alert][{0}] Unable to close URL connection to
{1}".format(self.alert_id, url))
+
+ data_json = None
+ try:
+ data_json = json.loads(data)
+ except Exception, exception:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.exception("[Alert][{0}] Convert response to json failed or json
doesn't contain needed data: {1}".
+ format(self.alert_id, str(exception)))
+
+ if not data_json:
+ return None, response.status
+
+ metric_dict = {}
+ if "metrics" not in data_json:
+ return None, response.status
+ for metrics_data in data_json["metrics"]:
+ metric_dict[metrics_data["metricname"]] = metrics_data["metrics"]
+
+ return metric_dict, response.status