Repository: ambari
Updated Branches:
  refs/heads/trunk 3b30de0a8 -> 875f1efb6


AMBARI-17457 : Modify the AMS stack scripts to support distributed collector 
(dsen via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/875f1efb
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/875f1efb
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/875f1efb

Branch: refs/heads/trunk
Commit: 875f1efb63cbbe81259a79c0b1530eba7ef2a3ec
Parents: 3b30de0
Author: Aravindan Vijayan <avija...@hortonworks.com>
Authored: Tue Sep 20 15:54:32 2016 -0700
Committer: Aravindan Vijayan <avija...@hortonworks.com>
Committed: Tue Sep 20 15:54:32 2016 -0700

----------------------------------------------------------------------
 .../ambari_commons/ambari_metrics_helper.py     |   5 +-
 .../ambari_commons/parallel_processing.py       |  95 ++++++++
 .../package/scripts/metrics_grafana_util.py     |  50 ++++-
 .../0.1.0/package/scripts/params.py             |   3 +
 .../0.1.0/package/scripts/service_check.py      | 225 ++++++++++---------
 .../metrics_grafana_datasource.json.j2          |   2 +-
 6 files changed, 266 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
----------------------------------------------------------------------
diff --git 
a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py 
b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
index 7b4e8f5..2eb0b6d 100644
--- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
+++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py
@@ -33,6 +33,9 @@ def select_metric_collector_for_sink(sink_name):
     return select_metric_collector_hosts_from_hostnames(all_collectors_list)
 
 def select_metric_collector_hosts_from_hostnames(hosts):
+    return get_random_host(hosts)
+
+def get_random_host(hosts):
     return random.choice(hosts)
 
 def get_metric_collectors_from_properties_file(sink_name):
@@ -53,4 +56,4 @@ def load_properties_from_file(filepath, sep='=', 
comment_char='#'):
                 key = key_value[0].strip()
                 value = sep.join(key_value[1:]).strip('" \t')
                 props[key] = value
-    return props
\ No newline at end of file
+    return props

http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-common/src/main/python/ambari_commons/parallel_processing.py
----------------------------------------------------------------------
diff --git 
a/ambari-common/src/main/python/ambari_commons/parallel_processing.py 
b/ambari-common/src/main/python/ambari_commons/parallel_processing.py
new file mode 100644
index 0000000..c5a95de
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/parallel_processing.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+from multiprocessing import Process, Queue
+
+logger = logging.getLogger()
+
+SUCCESS = "SUCCESS"
+FAILED = "FAILED"
+
+class PrallelProcessResult(object):
+    def __init__(self, element, status, result):
+        self.result = result
+        self.status = status
+        self.element = element
+
+class ParallelProcess(Process):
+
+
+    def __init__(self, function, element, params, queue):
+        self.function = function
+        self.element = element
+        self.params = params
+        self.queue = queue
+        super(ParallelProcess, self).__init__()
+
+    def return_name(self):
+        ## NOTE: self.name is an attribute of multiprocessing.Process
+        return "Process running function '%s' for element '%s'" % 
(self.function, self.element)
+
+    def run(self):
+        try:
+            result = self.function(self.element, self.params)
+            self.queue.put(PrallelProcessResult(self.element, SUCCESS, result))
+        except Exception as e:
+            self.queue.put(PrallelProcessResult(self.element, FAILED,
+                            "Exception while running function '%s' for '%s'. 
Reason : %s" % (self.function, self.element, str(e))))
+        return
+
+def execute_in_parallel(function, array, params, wait_for_all = False):
+    logger.info("Started running %s for %s" % (function, array))
+    processs = []
+    q = Queue()
+    counter = len(array)
+    results = {}
+
+    for element in array:
+        process = ParallelProcess(function, element, params, q)
+        process.start()
+        processs.append(process)
+
+    while counter > 0:
+        tmp = q.get()
+        counter-=1
+        results[tmp.element] = tmp
+        if tmp.status == SUCCESS and not wait_for_all:
+            counter = 0
+
+    for process in processs:
+        process.terminate()
+
+    logger.info("Finished running %s for %s" % (function, array))
+
+    return results
+
+def func (elem, params):
+    if elem == 'S':
+        return "lalala"
+    else :
+        raise Exception('Exception')
+
+if __name__ == "__main__":
+    results = execute_in_parallel(func, ['F', 'BF', 'S'], None)
+    for result in results:
+        print results[result].element
+        print results[result].status
+        print results[result].result
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
index e8c12ed..b98dc1d 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py
@@ -18,16 +18,21 @@ limitations under the License.
 
 """
 import httplib
+
+from ambari_commons.parallel_processing import PrallelProcessResult, 
execute_in_parallel, SUCCESS
+from service_check import post_metrics_to_collector
 from resource_management.core.logger import Logger
 from resource_management.core.base import Fail
 from resource_management import Template
 from collections import namedtuple
 from urlparse import urlparse
 from base64 import b64encode
+import random
 import time
 import socket
 import ambari_simplejson as json
 import network
+import os
 
 GRAFANA_CONNECT_TRIES = 5
 GRAFANA_CONNECT_TIMEOUT = 10
@@ -171,20 +176,32 @@ def perform_grafana_delete_call(url, server):
 
   return response
 
-def is_unchanged_datasource_url(datasource_url):
+def is_unchanged_datasource_url(grafana_datasource_url, new_datasource_host):
   import params
-  parsed_url = urlparse(datasource_url)
+  parsed_url = urlparse(grafana_datasource_url)
   Logger.debug("parsed url: scheme = %s, host = %s, port = %s" % (
     parsed_url.scheme, parsed_url.hostname, parsed_url.port))
   Logger.debug("collector: scheme = %s, host = %s, port = %s" %
-              (params.metric_collector_protocol, params.metric_collector_host,
+              (params.metric_collector_protocol, new_datasource_host,
                params.metric_collector_port))
 
   return parsed_url.scheme.strip() == params.metric_collector_protocol.strip() 
and \
-         parsed_url.hostname.strip() == params.metric_collector_host.strip() 
and \
+         parsed_url.hostname.strip() == new_datasource_host.strip() and \
          str(parsed_url.port) == params.metric_collector_port
 
+def do_ams_collector_post(metric_collector_host, params):
+    ams_metrics_post_url = "/ws/v1/timeline/metrics/"
+    random_value1 = random.random()
+    headers = {"Content-type": "application/json"}
+    ca_certs = os.path.join(params.ams_collector_conf_dir,
+                            params.metric_truststore_ca_certs)
+
+    current_time = int(time.time()) * 1000
+    metric_json = Template('smoketest_metrics.json.j2', 
hostname=params.hostname, random1=random_value1,
+                           current_time=current_time).get_content()
 
+    post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, 
params.metric_collector_port, params.metric_collector_https_enabled,
+                                metric_json, headers, ca_certs)
 def create_ams_datasource():
   import params
   server = Server(protocol = params.ams_grafana_protocol.strip(),
@@ -196,11 +213,28 @@ def create_ams_datasource():
   """
   Create AMS datasource in Grafana, if exsists make sure the collector url is 
accurate
   """
-  ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
-                                 
ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME).get_content()
+  Logger.info("Trying to find working metric collector")
+  results = execute_in_parallel(do_ams_collector_post, 
params.ams_collector_hosts, params)
+  new_datasource_host = ""
+
+  for host in params.ams_collector_hosts:
+    if host in results:
+      if results[host].status == SUCCESS:
+        new_datasource_host = host
+        Logger.info("Found working collector on host %s" % new_datasource_host)
+        break
+      else:
+        Logger.warning(results[host].result)
 
-  Logger.info("Checking if AMS Grafana datasource already exists")
+  if new_datasource_host == "":
+    Logger.warning("All metric collectors are unavailable. Will use random 
collector as datasource host.")
+    new_datasource_host = params.random_metric_collector_host
+
+  Logger.info("New datasource host will be %s" % new_datasource_host)
 
+  ams_datasource_json = Template('metrics_grafana_datasource.json.j2',
+                            
ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME, 
ams_datasource_host=new_datasource_host).get_content()
+  Logger.info("Checking if AMS Grafana datasource already exists")
 
   response = perform_grafana_get_call(GRAFANA_DATASOURCE_URL, server)
   create_datasource = True
@@ -215,7 +249,7 @@ def create_ams_datasource():
         Logger.info("Ambari Metrics Grafana datasource already present. 
Checking Metrics Collector URL")
         datasource_url = datasources_json[i]["url"]
 
-        if is_unchanged_datasource_url(datasource_url):
+        if is_unchanged_datasource_url(datasource_url, new_datasource_host):
           Logger.info("Metrics Collector URL validation succeeded.")
           return
         else: # Metrics datasource present, but collector host is wrong.

http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
index 22024bb..6934924 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py
@@ -120,6 +120,9 @@ if 'cluster-env' in config['configurations'] and \
   metric_collector_host = 
config['configurations']['cluster-env']['metrics_collector_vip_host']
 else:
   metric_collector_host = 
select_metric_collector_hosts_from_hostnames(ams_collector_hosts)
+
+random_metric_collector_host = 
select_metric_collector_hosts_from_hostnames(ams_collector_hosts)
+
 if 'cluster-env' in config['configurations'] and \
     'metrics_collector_vip_port' in config['configurations']['cluster-env']:
   metric_collector_port = 
config['configurations']['cluster-env']['metrics_collector_vip_port']

http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
index ddd3e42..56ca4a1 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py
@@ -25,6 +25,7 @@ from resource_management import Template
 
 from ambari_commons import OSConst
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
+from ambari_commons.parallel_processing import PrallelProcessResult, 
execute_in_parallel, SUCCESS
 
 import httplib
 import network
@@ -39,10 +40,10 @@ import socket
 class AMSServiceCheck(Script):
   AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
   AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s"
-  AMS_CONNECT_TRIES = 30
-  AMS_CONNECT_TIMEOUT = 15
-  AMS_READ_TRIES = 10
-  AMS_READ_TIMEOUT = 5
+  AMS_CONNECT_TRIES = 10
+  AMS_CONNECT_TIMEOUT = 10
+  AMS_READ_TRIES = 5
+  AMS_READ_TIMEOUT = 10
 
   @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY)
   def service_check(self, env):
@@ -62,124 +63,140 @@ class AMSServiceCheck(Script):
       if not 
check_windows_service_exists(params.ams_collector_win_service_name):
         raise Fail("Metrics Collector service was not properly installed. 
Check the logs and retry the installation.")
 
-  @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-  def service_check(self, env):
-    import params
-
-    Logger.info("Ambari Metrics service check was started.")
-    env.set_params(params)
-
+  def service_check_for_single_host(self, metric_collector_host, params):
     random_value1 = random.random()
     headers = {"Content-type": "application/json"}
     ca_certs = os.path.join(params.ams_collector_conf_dir,
                             params.metric_truststore_ca_certs)
 
-    for i in xrange(0, self.AMS_CONNECT_TRIES):
-      try:
-        current_time = int(time.time()) * 1000
-        metric_json = Template('smoketest_metrics.json.j2', 
hostname=params.hostname, random1=random_value1,
+    current_time = int(time.time()) * 1000
+    metric_json = Template('smoketest_metrics.json.j2', 
hostname=params.hostname, random1=random_value1,
                            current_time=current_time).get_content()
-        Logger.info("Generated metrics:\n%s" % metric_json)
-
-        Logger.info("Connecting (POST) to %s:%s%s" % 
(params.metric_collector_host,
-                                                      
params.metric_collector_port,
-                                                      
self.AMS_METRICS_POST_URL))
-        conn = network.get_http_connection(params.metric_collector_host,
+    try:
+      post_metrics_to_collector(self.AMS_METRICS_POST_URL, 
metric_collector_host, params.metric_collector_port, 
params.metric_collector_https_enabled,
+                                metric_json, headers, ca_certs, 
self.AMS_CONNECT_TRIES, self.AMS_CONNECT_TIMEOUT)
+
+      get_metrics_parameters = {
+        "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
+        "appId": "amssmoketestfake",
+        "hostname": params.hostname,
+        "startTime": current_time - 60000,
+        "endTime": current_time + 61000,
+        "precision": "seconds",
+        "grouped": "false",
+      }
+      encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
+
+      Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host,
+                                                   
params.metric_collector_port,
+                                                   self.AMS_METRICS_GET_URL % 
encoded_get_metrics_parameters))
+      for i in xrange(0, self.AMS_READ_TRIES):
+        conn = network.get_http_connection(metric_collector_host,
                                            int(params.metric_collector_port),
                                            
params.metric_collector_https_enabled,
                                            ca_certs)
-        conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers)
-
+        conn.request("GET", self.AMS_METRICS_GET_URL % 
encoded_get_metrics_parameters)
         response = conn.getresponse()
-        Logger.info("Http response: %s %s" % (response.status, 
response.reason))
-      except (httplib.HTTPException, socket.error) as ex:
-        if i < self.AMS_CONNECT_TRIES - 1:  #range/xrange returns items from 
start to end-1
-          time.sleep(self.AMS_CONNECT_TIMEOUT)
-          Logger.info("Connection failed. Next retry in %s seconds."
-                      % (self.AMS_CONNECT_TIMEOUT))
-          continue
-        else:
-          raise Fail("Metrics were not saved. Service check has failed. "
-               "\nConnection failed.")
+        Logger.info("Http response for host %s : %s %s" % 
(metric_collector_host, response.status, response.reason))
 
-      data = response.read()
-      Logger.info("Http data: %s" % data)
-      conn.close()
+        data = response.read()
+        Logger.info("Http data: %s" % data)
+        conn.close()
 
-      if response.status == 200:
-        Logger.info("Metrics were saved.")
-        break
-      else:
-        Logger.info("Metrics were not saved. Service check has failed.")
-        if i < self.AMS_CONNECT_TRIES - 1:  #range/xrange returns items from 
start to end-1
-          time.sleep(self.AMS_CONNECT_TIMEOUT)
-          Logger.info("Next retry in %s seconds."
-                      % (self.AMS_CONNECT_TIMEOUT))
+        if response.status == 200:
+          Logger.info("Metrics were retrieved from host %s" % 
metric_collector_host)
+        else:
+          raise Fail("Metrics were not retrieved from host %s. GET request 
status: %s %s \n%s" %
+                     (metric_collector_host, response.status, response.reason, 
data))
+        data_json = json.loads(data)
+
+        def floats_eq(f1, f2, delta):
+          return abs(f1-f2) < delta
+
+        values_are_present = False
+        for metrics_data in data_json["metrics"]:
+          if (str(current_time) in metrics_data["metrics"] and 
str(current_time + 1000) in metrics_data["metrics"]
+              and floats_eq(metrics_data["metrics"][str(current_time)], 
random_value1, 0.0000001)
+              and floats_eq(metrics_data["metrics"][str(current_time + 1000)], 
current_time, 1)):
+            Logger.info("Values %s and %s were found in the response from host 
%s." % (metric_collector_host, random_value1, current_time))
+            values_are_present = True
+            break
+            pass
+
+        if not values_are_present:
+          if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from 
start to end-1
+            Logger.info("Values weren't stored yet. Retrying in %s seconds."
+                        % (self.AMS_READ_TIMEOUT))
+            time.sleep(self.AMS_READ_TIMEOUT)
+          else:
+            raise Fail("Values %s and %s were not found in the response." % 
(random_value1, current_time))
         else:
-          raise Fail("Metrics were not saved. Service check has failed. POST 
request status: %s %s \n%s" %
-                     (response.status, response.reason, data))
-
-    get_metrics_parameters = {
-      "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric",
-      "appId": "amssmoketestfake",
-      "hostname": params.hostname,
-      "startTime": current_time - 60000,
-      "endTime": current_time + 61000,
-      "precision": "seconds",
-      "grouped": "false",
-    }
-    encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters)
-
-    Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host,
-                                                 params.metric_collector_port,
-                                              self.AMS_METRICS_GET_URL % 
encoded_get_metrics_parameters))
-    for i in xrange(0, self.AMS_READ_TRIES):
-      conn = network.get_http_connection(params.metric_collector_host,
-                                         int(params.metric_collector_port),
-                                         params.metric_collector_https_enabled,
-                                         ca_certs)
-      conn.request("GET", self.AMS_METRICS_GET_URL % 
encoded_get_metrics_parameters)
-      response = conn.getresponse()
-      Logger.info("Http response: %s %s" % (response.status, response.reason))
-
-      data = response.read()
-      Logger.info("Http data: %s" % data)
-      conn.close()
-
-      if response.status == 200:
-        Logger.info("Metrics were retrieved.")
-      else:
-        Logger.info("Metrics were not retrieved. Service check has failed.")
-        raise Fail("Metrics were not retrieved. Service check has failed. GET 
request status: %s %s \n%s" %
-                   (response.status, response.reason, data))
-      data_json = json.loads(data)
-
-      def floats_eq(f1, f2, delta):
-        return abs(f1-f2) < delta
-
-      values_are_present = False
-      for metrics_data in data_json["metrics"]:
-        if (str(current_time) in metrics_data["metrics"] and str(current_time 
+ 1000) in metrics_data["metrics"]
-            and floats_eq(metrics_data["metrics"][str(current_time)], 
random_value1, 0.0000001)
-            and floats_eq(metrics_data["metrics"][str(current_time + 1000)], 
current_time, 1)):
-          Logger.info("Values %s and %s were found in the response." % 
(random_value1, current_time))
-          values_are_present = True
           break
           pass
+    except Fail as ex:
+      Logger.warning("Ambari Metrics service check failed on collector host 
%s. Reason : %s" % (metric_collector_host, str(ex)))
+      raise Fail("Ambari Metrics service check failed on collector host %s. 
Reason : %s" % (metric_collector_host, str(ex)))
+
+  @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
+  def service_check(self, env):
+    import params
+
+    Logger.info("Ambari Metrics service check was started.")
+    env.set_params(params)
+
+    results = execute_in_parallel(self.service_check_for_single_host, 
params.ams_collector_hosts, params)
 
-      if not values_are_present:
-        if i < self.AMS_READ_TRIES - 1:  #range/xrange returns items from 
start to end-1
-          Logger.info("Values weren't stored yet. Retrying in %s seconds."
-                    % (self.AMS_READ_TIMEOUT))
-          time.sleep(self.AMS_READ_TIMEOUT)
+    for host in params.ams_collector_hosts:
+      if host in results:
+        if results[host].status == SUCCESS:
+          Logger.info("Ambari Metrics service check passed on host " + host)
+          return
         else:
-          Logger.info("Values %s and %s were not found in the response." % 
(random_value1, current_time))
-          raise Fail("Values %s and %s were not found in the response." % 
(random_value1, current_time))
-      else:
-        break
-        pass
-    Logger.info("Ambari Metrics service check is finished.")
+          Logger.warning(results[host].result)
+    raise Fail("All metrics collectors are unavailable.")
+
+def post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, 
metric_collector_port, metric_collector_https_enabled,
+                              metric_json, headers, ca_certs, tries = 1, 
connect_timeout = 10):
+  for i in xrange(0, tries):
+    try:
+      Logger.info("Generated metrics for host %s :\n%s" % 
(metric_collector_host, metric_json))
+
+      Logger.info("Connecting (POST) to %s:%s%s" % (metric_collector_host,
+                                                    metric_collector_port,
+                                                    ams_metrics_post_url))
+      conn = network.get_http_connection(metric_collector_host,
+                                         int(metric_collector_port),
+                                         metric_collector_https_enabled,
+                                         ca_certs)
+      conn.request("POST", ams_metrics_post_url, metric_json, headers)
 
+      response = conn.getresponse()
+      Logger.info("Http response for host %s: %s %s" % (metric_collector_host, 
response.status, response.reason))
+    except (httplib.HTTPException, socket.error) as ex:
+      if i < tries - 1:  #range/xrange returns items from start to end-1
+        time.sleep(connect_timeout)
+        Logger.info("Connection failed for host %s. Next retry in %s seconds."
+                    % (metric_collector_host, connect_timeout))
+        continue
+      else:
+        raise Fail("Metrics were not saved. Connection failed.")
+
+    data = response.read()
+    Logger.info("Http data: %s" % data)
+    conn.close()
+
+    if response.status == 200:
+      Logger.info("Metrics were saved.")
+      break
+    else:
+      Logger.info("Metrics were not saved.")
+      if i < tries - 1:  #range/xrange returns items from start to end-1
+        time.sleep(tries)
+        Logger.info("Next retry in %s seconds."
+                    % (tries))
+      else:
+        raise Fail("Metrics were not saved. POST request status: %s %s \n%s" %
+                   (response.status, response.reason, data))
 if __name__ == "__main__":
   AMSServiceCheck().execute()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
index 678d769..30870c4 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2
@@ -20,7 +20,7 @@
   "name": "{{ams_datasource_name}}",
   "type": "ambarimetrics",
   "access": "proxy",
-  "url": 
"{{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}",
+  "url": 
"{{metric_collector_protocol}}://{{ams_datasource_host}}:{{metric_collector_port}}",
   "password": "",
   "user": "",
   "database": "",

Reply via email to