Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 9b9402e93 -> a9774d664


AMBARI-21732. Report alerts status to server (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a9774d66
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a9774d66
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a9774d66

Branch: refs/heads/branch-3.0-perf
Commit: a9774d6648474911716d9abc278272437ba56d11
Parents: 9b9402e
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Wed Aug 16 13:36:34 2017 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Wed Aug 16 13:36:34 2017 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 +
 .../ambari_agent/AlertSchedulerHandler.py       |  2 +-
 .../python/ambari_agent/AlertStatusReporter.py  | 56 ++++++++++++++++++++
 .../ambari_agent/ClusterConfigurationCache.py   |  2 +-
 .../src/main/python/ambari_agent/Constants.py   |  1 +
 .../python/ambari_agent/InitializerModule.py    |  1 +
 .../python/ambari_agent/alerts/base_alert.py    |  6 ++-
 .../src/main/python/ambari_agent/main.py        | 10 +++-
 8 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini 
b/ambari-agent/conf/unix/ambari-agent.ini
index 609f0fa..e1df1d2 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -36,6 +36,7 @@ run_as_user=root
 parallel_execution=0
 alert_grace_period=5
 status_command_timeout=5
+alert_reports_interval=5
 ; 0 - don't report commands output periodically. Reduces bandwidth on big 
cluster
 command_reports_interval=5
 alert_kinit_timeout=14400000

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py 
b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 0dfc977..3d7c30c 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -307,7 +307,7 @@ class AlertSchedulerHandler():
         alert = RecoveryAlert(json_definition, source, self.config, 
self.recovery_manger)
 
       if alert is not None:
-        alert.set_cluster(clusterName, hostName)
+        alert.set_cluster(clusterName, json_definition['clusterId'], hostName)
 
     except Exception, exception:
       logger.exception("[AlertScheduler] Unable to load an invalid alert 
definition. It will be skipped.")

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
new file mode 100644
index 0000000..20cb717
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import threading
+from ambari_agent import security
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class AlertStatusReporter(threading.Thread):
+  def __init__(self, initializer_module):
+    self.initializer_module = initializer_module
+    self.collector = initializer_module.alert_scheduler_handler.collector()
+    self.stop_event = initializer_module.stop_event
+    self.alert_reports_interval = initializer_module.alert_reports_interval
+    threading.Thread.__init__(self)
+
+  def run(self):
+    """
+    Run an endless loop which reports all the alert statuses got from collector
+    """
+    if self.alert_reports_interval == 0:
+      return
+
+    while not self.stop_event.is_set():
+      try:
+        if self.initializer_module.is_registered:
+          alerts = self.collector.alerts()
+          if alerts:
+            self.initializer_module.connection.send(message=alerts, 
destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT)
+      except security.ConnectionIsNotEstablished: # server and agent 
disconnected during sending data. Not an issue
+        pass
+      except:
+        logger.exception("Exception in AlertStatusReporter. Re-running it")
+
+      self.stop_event.wait(self.alert_reports_interval)
+
+    logger.info("AlertStatusReporter has successfully finished")

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
index f0c3945..77ca4c1 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
@@ -51,7 +51,7 @@ class ClusterConfigurationCache(ClusterCache):
     """
     self._cache_lock.acquire()
     try:
-      dictionary = self[cluster_id]
+      dictionary = self[str(cluster_id)]['configurations']
       for layer_key in key.split('/'):
         dictionary = dictionary[layer_key]
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 37b5df2..e36eda5 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -38,6 +38,7 @@ ALERTS_DEFENITIONS_REQUEST_ENDPOINT = 
'/agents/alert_defenitions'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'
+ALERTS_STATUS_REPORTS_ENDPOINT = '/reports/alerts_status'
 
 HEARTBEAT_ENDPOINT = '/heartbeat'
 REGISTRATION_ENDPOINT = '/register'

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index cbf0780..8208b32 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -60,6 +60,7 @@ class InitializerModule:
 
     self.cache_dir = self.config.get('agent', 'cache_dir', 
default='/var/lib/ambari-agent/cache')
     self.command_reports_interval = int(self.config.get('agent', 
'command_reports_interval', default='5'))
+    self.alert_reports_interval = int(self.config.get('agent', 
'alert_reports_interval', default='5'))
 
     self.cluster_cache_dir = os.path.join(self.cache_dir, 
FileCache.CLUSTER_CACHE_DIRECTORY)
     self.recovery_cache_dir = os.path.join(self.cache_dir, 
FileCache.RECOVERY_CACHE_DIRECTORY)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index ce1583f..76d8390 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -45,6 +45,7 @@ class BaseAlert(object):
     self.alert_meta = alert_meta
     self.alert_source_meta = alert_source_meta
     self.cluster_name = ''
+    self.cluster_id = None
     self.host_name = ''
     self.config = config
     
@@ -86,9 +87,10 @@ class BaseAlert(object):
     self.cluster_configuration_cache = cluster_configuration_cache
 
 
-  def set_cluster(self, cluster_name, host_name):
+  def set_cluster(self, cluster_name, cluster_id, host_name):
     """ sets cluster information for the alert """
     self.cluster_name = cluster_name
+    self.cluster_id = cluster_id
     self.host_name = host_name
 
 
@@ -216,7 +218,7 @@ class BaseAlert(object):
     resolved_key = key
     for placeholder_key in placeholder_keys:
       value = self.cluster_configuration_cache.get_configuration_value(
-        self.cluster_name, placeholder_key)
+        self.cluster_id, placeholder_key)
 
       # if any of the placeholder keys is missing from the configuration, then
       # return None as per the contract of this function

http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index c255e8b..ece9b7a 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -108,6 +108,7 @@ from ambari_agent.InitializerModule import InitializerModule
 from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
 from ambari_agent.CommandStatusReporter import CommandStatusReporter
 from ambari_agent.HostStatusReporter import HostStatusReporter
+from ambari_agent.AlertStatusReporter import AlertStatusReporter
 
 #logging.getLogger('ambari_agent').propagate = False
 
@@ -366,13 +367,20 @@ def run_threads(initializer_module):
   host_status_reporter = HostStatusReporter(initializer_module)
   host_status_reporter.start()
 
+  alert_status_reporter = AlertStatusReporter(initializer_module)
+  alert_status_reporter.start()
+
   initializer_module.action_queue.start()
 
   while not initializer_module.stop_event.is_set():
     time.sleep(0.1)
 
-  heartbeat_thread.join()
+  command_status_reporter.join()
   component_status_executor.join()
+  host_status_reporter.join()
+  alert_status_reporter.join()
+  heartbeat_thread.join()
+  initializer_module.action_queue.join()
 
 # event - event, that will be passed to Controller and NetUtil to make able to 
interrupt loops form outside process
 # we need this for windows os, where no sigterm available

Reply via email to