This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27d3aba  AMBARI-24727 - Autoscaling based on metric alerts (#2416)
27d3aba is described below

commit 27d3abace07c8cdd209b918a8981a4e08618d7dd
Author: kasakrisz <33458261+kasakr...@users.noreply.github.com>
AuthorDate: Thu Oct 4 06:47:58 2018 +0200

    AMBARI-24727 - Autoscaling based on metric alerts (#2416)
---
 .../python/ambari_commons/ambari_metrics_helper.py | 163 ++++++++++++++++++++-
 1 file changed, 162 insertions(+), 1 deletion(-)

diff --git 
a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py 
b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
index 6444dfd..07e4831 100644
--- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
+++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
@@ -18,13 +18,33 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 '''
 
+import ambari_commons.network as network
+import ambari_simplejson as json
+import logging
 import os
 import random
+import urllib
+from ambari_agent.AmbariConfig import AmbariConfig
 from resource_management.libraries.functions import conf_select
+from resource_management.libraries.functions.default import default
+from resource_management.libraries.functions.is_empty import is_empty
+
+logger = logging.getLogger(__name__)
+
 
 DEFAULT_COLLECTOR_SUFFIX = '.sink.timeline.collector.hosts'
 DEFAULT_METRICS2_PROPERTIES_FILE_NAME = 'hadoop-metrics2.properties'
 
+AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
+
+METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = 
'{{ams-site/timeline.metrics.service.webapp.address}}'
+METRICS_COLLECTOR_VIP_HOST_KEY = 
'{{cluster-env/metrics_collector_external_hosts}}'
+METRICS_COLLECTOR_VIP_PORT_KEY = 
'{{cluster-env/metrics_collector_external_port}}'
+AMS_METRICS_COLLECTOR_USE_SSL_KEY = 
'{{ams-site/timeline.metrics.service.http.policy}}'
+CONNECTION_TIMEOUT_KEY = 'http.connection.timeout'
+CONNECTION_TIMEOUT_DEFAULT = 5.0
+
+
 def select_metric_collector_for_sink(sink_name):
   # TODO check '*' sink_name
 
@@ -71,4 +91,145 @@ def load_properties_from_file(filepath, sep='=', 
comment_char='#'):
         key = key_value[0].strip()
         value = sep.join(key_value[1:]).strip('" \t')
         props[key] = value
-  return props
\ No newline at end of file
+  return props
+
+
+def get_ams_tokens():
+  return (METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, 
AMS_METRICS_COLLECTOR_USE_SSL_KEY, METRICS_COLLECTOR_VIP_HOST_KEY, 
METRICS_COLLECTOR_VIP_PORT_KEY)
+
+
+def create_ams_client(alert_id, ams_app_id, configurations, parameters):
+  if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and 
METRICS_COLLECTOR_VIP_PORT_KEY in configurations:
+    ams_collector_hosts = 
configurations[METRICS_COLLECTOR_VIP_HOST_KEY].split(',')
+    ams_collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
+  else:
+    # ams-site/timeline.metrics.service.webapp.address is required
+    if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
+      raise Exception('{0} is a required parameter for the 
script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY))
+
+    collector_webapp_address = 
configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":")
+    if not _valid_collector_webapp_address(collector_webapp_address):
+      raise Exception('{0} value should be set as "fqdn_hostname:port", but 
set to {1}'.format(
+        METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, 
configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY]))
+
+    ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", 
[])
+    if not ams_collector_hosts:
+      raise Exception("Ambari metrics is not available: ams_collector_hosts is 
None")
+    ams_collector_port = int(collector_webapp_address[1])
+
+  use_ssl = False
+  if AMS_METRICS_COLLECTOR_USE_SSL_KEY in configurations:
+    use_ssl = configurations[AMS_METRICS_COLLECTOR_USE_SSL_KEY] == 'HTTPS_ONLY'
+
+  connection_timeout = CONNECTION_TIMEOUT_DEFAULT
+  if CONNECTION_TIMEOUT_KEY in parameters:
+    connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY])
+  return AmsClient(alert_id, ams_collector_hosts, ams_collector_port, use_ssl, 
connection_timeout, ams_app_id)
+
+def _valid_collector_webapp_address(webapp_address):
+  if len(webapp_address) == 2 \
+          and webapp_address[0] != '127.0.0.1' \
+          and webapp_address[1].isdigit():
+    return True
+
+  return False
+
+class AmsClient:
+
+  def __init__(self, alert_id, ams_collector_hosts, ams_collector_port, 
use_ssl, connection_timeout, ams_app_id):
+    self.alert_id = alert_id
+    self.ams_collector_hosts = ams_collector_hosts
+    self.ams_collector_port = ams_collector_port
+    self.use_ssl = use_ssl
+    self.connection_timeout = connection_timeout
+    self.ams_app_id = ams_app_id
+
+  def load_metric(self, ams_metric, host_filter):
+    metric_dict = None
+    http_code = None
+    for ams_collector_host in self.ams_collector_hosts:
+      try:
+        metric_dict, http_code = self._load_metric(ams_collector_host, 
ams_metric, host_filter)
+        if http_code == 200 and metric_dict:
+          break
+      except Exception, exception:
+        if logger.isEnabledFor(logging.DEBUG):
+          logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS 
({1}:{2}): {3}".format(self.alert_id, ams_collector_host, 
self.ams_collector_port, str(exception)))
+
+    if not http_code:
+      raise Exception("Ambari metrics is not available: no response")
+    if http_code not in [200, 299]:
+      raise Exception("Ambari metrics is not available: http status code = " + 
str(http_code))
+    if not metric_dict:
+      raise Exception("Ambari metrics is not available: no metrics were 
found.")
+
+    return metric_dict
+
+
+  def _load_metric(self, ams_collector_host, ams_metric, host_filter):
+    get_metrics_parameters = {
+      "metricNames": ams_metric,
+      "appId": self.ams_app_id,
+      "hostname": host_filter,
+      "precision": "seconds",
+      "grouped": "true",
+    }
+    encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
+    url = AMS_METRICS_GET_URL % encoded_get_metrics_parameters
+
+    _ssl_version = 
AmbariConfig.get_resolved_config().get_force_https_protocol_value()
+
+    ams_monitor_conf_dir = "/etc/ambari-metrics-monitor/conf"
+    metric_truststore_ca_certs='ca.pem'
+    ca_certs = os.path.join(ams_monitor_conf_dir, metric_truststore_ca_certs)
+
+    conn = None
+    response = None
+    data = None
+    try:
+      conn = network.get_http_connection(
+        ams_collector_host,
+        int(self.ams_collector_port),
+        self.use_ssl,
+        ca_certs,
+        ssl_version=_ssl_version
+      )
+      conn.request("GET", url)
+      response = conn.getresponse()
+      data = response.read()
+    except Exception, exception:
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.exception("[Alert][{0}] Unable to retrieve metrics from AMS: 
{1}".format(self.alert_id, str(exception)))
+      status = response.status if response else None
+      return None, status
+    finally:
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.debug("""
+        AMS request parameters - {0}
+        AMS response - {1}
+        """.format(encoded_get_metrics_parameters, data))
+      # explicitly close the connection as we've seen python hold onto these
+      if conn is not None:
+        try:
+          conn.close()
+        except:
+          logger.debug("[Alert][{0}] Unable to close URL connection to 
{1}".format(self.alert_id, url))
+
+    data_json = None
+    try:
+      data_json = json.loads(data)
+    except Exception, exception:
+      if logger.isEnabledFor(logging.DEBUG):
+        logger.exception("[Alert][{0}] Convert response to json failed or json 
doesn't contain needed data: {1}".
+                         format(self.alert_id, str(exception)))
+
+    if not data_json:
+      return None, response.status
+
+    metric_dict = {}
+    if "metrics" not in data_json:
+      return None, response.status
+    for metrics_data in data_json["metrics"]:
+      metric_dict[metrics_data["metricname"]] = metrics_data["metrics"]
+
+    return metric_dict, response.status

Reply via email to