AMBARI-21667. Create a topic to send alert_definitions (aonishuk)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6578b5a2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6578b5a2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6578b5a2

Branch: refs/heads/branch-3.0-perf
Commit: 6578b5a28516e8afa66c2f302cfe7abfdbc31472
Parents: 44c1cb5
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Tue Aug 8 12:11:28 2017 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Tue Aug 8 12:11:28 2017 +0300

----------------------------------------------------------------------
 .../ambari_agent/AlertSchedulerHandler.py       |   69 +-
 .../main/python/ambari_agent/AmbariConfig.py    |    2 +-
 .../ClusterAlertDefinitionsCache.py             |   62 +
 .../src/main/python/ambari_agent/Constants.py   |    4 +-
 .../main/python/ambari_agent/HeartbeatThread.py |    7 +-
 .../python/ambari_agent/InitializerModule.py    |   12 +-
 .../python/ambari_agent/alerts/base_alert.py    |    2 +-
 .../python/ambari_agent/alerts/collector.py     |    2 +-
 .../python/ambari_agent/alerts/metric_alert.py  |    2 +-
 .../python/ambari_agent/alerts/port_alert.py    |    2 +-
 .../ambari_agent/alerts/recovery_alert.py       |    2 +-
 .../python/ambari_agent/alerts/script_alert.py  |    2 +-
 .../ambari_agent/apscheduler/threadpool.py      |    2 +-
 .../listeners/AlertDefinitionsEventListener.py  |   55 +
 .../listeners/HostLevelParamsEventListener.py   |    2 +-
 .../src/main/python/ambari_agent/main.py        |   14 +-
 .../ambari_agent/BaseStompServerTestCase.py     |    6 +
 .../ambari_agent/TestAgentStompResponses.py     |   23 +-
 .../dummy_files/stomp/alert_definitions.json    | 2700 ++++++++++++++++++
 19 files changed, 2902 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py 
b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 6c1d29c..0dfc977 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -36,11 +36,12 @@ from alerts.script_alert import ScriptAlert
 from alerts.web_alert import WebAlert
 from alerts.recovery_alert import RecoveryAlert
 from ambari_agent.ExitHelper import ExitHelper
+from ambari_agent.FileCache import FileCache
+from ambari_agent.Utils import Utils
 
 logger = logging.getLogger(__name__)
 
 class AlertSchedulerHandler():
-  FILENAME = 'definitions.json'
   TYPE_PORT = 'PORT'
   TYPE_METRIC = 'METRIC'
   TYPE_AMS = 'AMS'
@@ -48,29 +49,24 @@ class AlertSchedulerHandler():
   TYPE_WEB = 'WEB'
   TYPE_RECOVERY = 'RECOVERY'
 
-  def __init__(self, cachedir, stacks_dir, common_services_dir, extensions_dir,
-      host_scripts_dir, cluster_configuration, config, recovery_manager,
-      in_minutes=True):
+  def __init__(self, initializer_module, in_minutes=True):
 
-    self.cachedir = cachedir
-    self.stacks_dir = stacks_dir
-    self.common_services_dir = common_services_dir
-    self.extensions_dir = extensions_dir
-    self.host_scripts_dir = host_scripts_dir
+    self.cachedir = initializer_module.alerts_cachedir
+    self.stacks_dir = initializer_module.stacks_dir
+    self.common_services_dir = initializer_module.common_services_dir
+    self.extensions_dir = initializer_module.extensions_dir
+    self.host_scripts_dir = initializer_module.host_scripts_dir
 
-    self._cluster_configuration = cluster_configuration
+    self._cluster_configuration = initializer_module.configurations_cache
+    self.alert_definitions_cache = initializer_module.alert_definitions_cache
+
+    self.config = initializer_module.config
 
     # a mapping between a cluster name and a unique hash for all definitions
     self._cluster_hashes = {}
 
     # the amount of time, in seconds, that an alert can run after it's 
scheduled time
-    alert_grace_period = int(config.get('agent', 'alert_grace_period', 5))
-
-    if not os.path.exists(cachedir):
-      try:
-        os.makedirs(cachedir)
-      except:
-        logger.critical("[AlertScheduler] Could not create the cache directory 
{0}".format(cachedir))
+    alert_grace_period = int(self.config.get('agent', 'alert_grace_period', 5))
 
     apscheduler_standalone = False
 
@@ -80,14 +76,13 @@ class AlertSchedulerHandler():
       'apscheduler.standalone': apscheduler_standalone,
       'apscheduler.misfire_grace_time': alert_grace_period,
       'apscheduler.threadpool.context_injector': self._job_context_injector if 
not apscheduler_standalone else None,
-      'apscheduler.threadpool.agent_config': config
+      'apscheduler.threadpool.agent_config': self.config
     }
 
     self._collector = AlertCollector()
     self.__scheduler = Scheduler(self.APS_CONFIG)
     self.__in_minutes = in_minutes
-    self.config = config
-    self.recovery_manger = recovery_manager
+    self.recovery_manger = initializer_module.recovery_manager
 
     # register python exit handler
     ExitHelper().register(self.exit_handler)
@@ -113,31 +108,17 @@ class AlertSchedulerHandler():
     self.stop()
 
 
-  def update_definitions(self, heartbeat):
+  def update_definitions(self):
     """
     Updates the persisted alert definitions JSON.
-    :param heartbeat:
     :return:
     """
-    if 'alertDefinitionCommands' not in heartbeat:
-      logger.warning("There are no alert definition commands in the heartbeat; 
unable to update definitions")
-      return
-
     # prune out things we don't want to store
     alert_definitions = []
-    for command in heartbeat['alertDefinitionCommands']:
-      command_copy = command.copy()
-
-      # no need to store these since we always use the in-memory cached values
-      if 'configurations' in command_copy:
-        del command_copy['configurations']
-
+    for cluster_id, command in self.alert_definitions_cache.iteritems():
+      command_copy = Utils.get_mutable_copy(command)
       alert_definitions.append(command_copy)
 
-    # write out the new definitions
-    with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f:
-      json.dump(alert_definitions, f, indent=2)
-
     # determine how to reschedule the jobs
     reschedule_all = False
     if "clusterName" in command_copy and command_copy["clusterName"] not in 
self._cluster_hashes:
@@ -271,16 +252,7 @@ class AlertSchedulerHandler():
     :return:
     """
     definitions = []
-
-    alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME)
-    try:
-      with open(alerts_definitions_path) as fp:
-        all_commands = json.load(fp)
-    except:
-      logger.warning('[AlertScheduler] {0} not found or invalid. No alerts 
will be scheduled until registration occurs.'.format(alerts_definitions_path))
-      return definitions
-
-    for command_json in all_commands:
+    for cluster_id, command_json in self.alert_definitions_cache.iteritems():
       clusterName = '' if not 'clusterName' in command_json else 
command_json['clusterName']
       hostName = '' if not 'hostName' in command_json else 
command_json['hostName']
       clusterHash = None if not 'hash' in command_json else 
command_json['hash']
@@ -289,9 +261,8 @@ class AlertSchedulerHandler():
       if clusterName != '' and clusterHash is not None:
         logger.info('[AlertScheduler] Caching cluster {0} with alert hash 
{1}'.format(clusterName, clusterHash))
         self._cluster_hashes[clusterName] = clusterHash
-
       for definition in command_json['alertDefinitions']:
-        alert = self.__json_to_callable(clusterName, hostName, definition)
+        alert = self.__json_to_callable(clusterName, hostName, 
Utils.get_mutable_copy(definition))
 
         if alert is None:
           continue

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py 
b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 9507c9d..e1c40c8 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -241,7 +241,7 @@ class AmbariConfig:
     :return: Alerts log file path.
     """
     if 'AMBARI_AGENT_LOG_DIR' in os.environ:
-      return os.path.join(os.environ['AMBARI_AGENT_LOG_DIR'], 
"ambari-agent.log")
+      return os.path.join(os.environ['AMBARI_AGENT_LOG_DIR'], 
"ambari-alerts.log")
     else:
       return os.path.join(os.sep, home_dir, "var", "log", "ambari-agent", 
"ambari-alerts.log")
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py
new file mode 100644
index 0000000..39b88e6
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from ambari_agent.ClusterCache import ClusterCache
+
+class ClusterAlertDefinitionsCache(ClusterCache):
+  """
+  Maintains an in-memory cache and disk cache of the host level params send 
from server for
+  every cluster. This is useful for having quick access to any of the
+  topology properties.
+
+  Host level params. Is parameters used by execution and status commands which 
can be generated
+  differently for every host.
+  """
+
+  def __init__(self, cluster_cache_dir):
+    """
+    Initializes the host level params cache.
+    :param cluster_cache_dir:
+    :return:
+    """
+    super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir)
+
+  def get_cache_name(self):
+    return 'alert_definitions'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 17ed2be..37b5df2 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -22,17 +22,19 @@ limitations under the License.
 COMMANDS_TOPIC = '/user/commands'
 CONFIGURATIONS_TOPIC = '/user/configs'
 HOST_LEVEL_PARAMS_TOPIC = '/user/host_level_params'
+ALERTS_DEFENITIONS_TOPIC = '/user/alert_defenitions'
 METADATA_TOPIC = '/events/metadata'
 TOPOLOGIES_TOPIC = '/events/topologies'
 SERVER_RESPONSES_TOPIC = '/user/'
 
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
-POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, 
METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, 
METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, 
ALERTS_DEFENITIONS_TOPIC]
 
 TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
 HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params'
+ALERTS_DEFENITIONS_REQUEST_ENDPOINT = '/agents/alert_defenitions'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 85840d0..e2fe4af 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -34,6 +34,7 @@ from ambari_agent.listeners.ConfigurationEventListener import 
ConfigurationEvent
 from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
 from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
 from ambari_agent.listeners.HostLevelParamsEventListener import 
HostLevelParamsEventListener
+from ambari_agent.listeners.AlertDefinitionsEventListener import 
AlertDefinitionsEventListener
 
 HEARTBEAT_INTERVAL = 10
 REQUEST_RESPONSE_TIMEOUT = 10
@@ -60,13 +61,15 @@ class HeartbeatThread(threading.Thread):
     self.topology_events_listener = 
TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = 
ConfigurationEventListener(initializer_module.configurations_cache)
     self.host_level_params_events_listener = 
HostLevelParamsEventListener(initializer_module.host_level_params_cache, 
initializer_module.recovery_manager)
-    self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener]
+    self.alert_definitions_events_listener = 
AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, 
initializer_module.alert_scheduler_handler)
+    self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener, self.alert_definitions_events_listener]
 
     self.post_registration_requests = [
     (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, 
self.topology_events_listener),
     (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, 
self.metadata_events_listener),
     (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, 
initializer_module.configurations_cache, self.configuration_events_listener),
-    (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, 
initializer_module.host_level_params_cache, 
self.host_level_params_events_listener)
+    (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, 
initializer_module.host_level_params_cache, 
self.host_level_params_events_listener),
+    (Constants.ALERTS_DEFENITIONS_REQUEST_ENDPOINT, 
initializer_module.alert_definitions_cache, 
self.alert_definitions_events_listener)
     ]
     self.responseId = 0
     self.file_cache = initializer_module.file_cache

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 6ef4a04..9b031f7 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -29,14 +29,16 @@ from ambari_agent.ClusterConfigurationCache import 
ClusterConfigurationCache
 from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
 from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
 from ambari_agent.ClusterHostLevelParamsCache import 
ClusterHostLevelParamsCache
+from ambari_agent.ClusterAlertDefinitionsCache import 
ClusterAlertDefinitionsCache
 from ambari_agent.Utils import lazy_property
 from ambari_agent.security import AmbariStompConnection
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.RecoveryManager import RecoveryManager
+from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 class InitializerModule:
   """
@@ -60,8 +62,14 @@ class InitializerModule:
 
     self.cache_dir = self.config.get('agent', 'cache_dir', 
default='/var/lib/ambari-agent/cache')
     self.command_reports_interval = int(self.config.get('agent', 
'command_reports_interval', default='5'))
+
     self.cluster_cache_dir = os.path.join(self.cache_dir, 
FileCache.CLUSTER_CACHE_DIRECTORY)
     self.recovery_cache_dir = os.path.join(self.cache_dir, 
FileCache.RECOVERY_CACHE_DIRECTORY)
+    self.alerts_cachedir = os.path.join(self.cache_dir, 
FileCache.ALERTS_CACHE_DIRECTORY)
+    self.stacks_dir = os.path.join(self.cache_dir, 
FileCache.STACKS_CACHE_DIRECTORY)
+    self.common_services_dir = os.path.join(self.cache_dir, 
FileCache.COMMON_SERVICES_DIRECTORY)
+    self.extensions_dir = os.path.join(self.cache_dir, 
FileCache.EXTENSIONS_CACHE_DIRECTORY)
+    self.host_scripts_dir = os.path.join(self.cache_dir, 
FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
 
     self.host_status_report_interval = int(self.config.get('heartbeat', 
'state_interval_seconds', '60'))
 
@@ -77,6 +85,7 @@ class InitializerModule:
     self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, 
self.config)
     self.configurations_cache = 
ClusterConfigurationCache(self.cluster_cache_dir)
     self.host_level_params_cache = 
ClusterHostLevelParamsCache(self.cluster_cache_dir)
+    self.alert_definitions_cache = 
ClusterAlertDefinitionsCache(self.cluster_cache_dir)
 
     self.file_cache = FileCache(self.config)
 
@@ -85,6 +94,7 @@ class InitializerModule:
     self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
     self.commandStatuses = CommandStatusDict(self)
     self.action_queue = ActionQueue(self)
+    self.alert_scheduler_handler = AlertSchedulerHandler(self)
 
   @lazy_property
   def connection(self):

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index b75b540..ce1583f 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -23,7 +23,7 @@ import re
 import time
 from collections import namedtuple
 
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 # create a named tuple to return both the concrete URI and SSL flag
 AlertUri = namedtuple('AlertUri', 'uri is_ssl_enabled')

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
index adc7f47..089301f 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
@@ -21,7 +21,7 @@ limitations under the License.
 import logging
 import threading
 
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 class AlertCollector():
   """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
index 803bdc6..da49d2a 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py
@@ -32,7 +32,7 @@ from 
resource_management.libraries.functions.get_port_from_url import get_port_f
 from resource_management.libraries.functions.curl_krb_request import 
curl_krb_request
 from ambari_agent import Constants
 
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}'
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
index 1e32718..b99b964 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -25,7 +25,7 @@ from alerts.base_alert import BaseAlert
 from resource_management.libraries.functions.get_port_from_url import 
get_port_from_url
 from ambari_commons import OSCheck
 from ambari_commons.inet_utils import resolve_address
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 # default timeouts
 DEFAULT_WARNING_TIMEOUT = 1.5

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
index 3092a39..de221ae 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py
@@ -21,7 +21,7 @@ limitations under the License.
 import logging
 import datetime
 from alerts.base_alert import BaseAlert
-logger = logging.getLogger()
+logger = logging.getLogger(__name__)
 
 # default recoveries counts
 DEFAULT_WARNING_RECOVERIES_COUNT = 2

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
index 945a222..301e440 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py
@@ -27,7 +27,7 @@ from resource_management.core.environment import Environment
 from resource_management.libraries.functions.curl_krb_request import 
KERBEROS_KINIT_TIMER_PARAMETER
 from ambari_agent import Constants
 
-logger = logging.getLogger("ambari_alerts")
+logger = logging.getLogger(__name__)
 
 class ScriptAlert(BaseAlert):
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py 
b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
index d6624cc..cb5f7ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
@@ -118,7 +118,7 @@ class ThreadPool(object):
         if self._shutdown:
             return
 
-        logging.info('Shutting down thread pool')
+        logger.info('Shutting down thread pool')
         self._shutdown = True
         _threadpools.remove(ref(self))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
new file mode 100644
index 0000000..0829c31
--- /dev/null
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class AlertDefinitionsEventListener(EventListener):
+  """
+  Listener of Constants.ALERTS_DEFENITIONS_TOPIC events from server.
+  """
+  def __init__(self, alert_definitions_cache, alert_scheduler_handler):
+    self.alert_definitions_cache = alert_definitions_cache
+    self.alert_scheduler_handler = alert_scheduler_handler
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.ALERTS_DEFENITIONS_TOPIC topic is 
received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    # this kind of response is received if hash was identical. And server does 
not need to change anything
+    if message == {}:
+      return
+
+    self.alert_definitions_cache.rewrite_cache(message['clusters'])
+    print message
+    self.alert_definitions_cache.hash = message['hash']
+
+    self.alert_scheduler_handler.update_definitions()
+
+  def get_handled_path(self):
+    return Constants.ALERTS_DEFENITIONS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
index 5aee634..aee2992 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -36,7 +36,7 @@ class HostLevelParamsEventListener(EventListener):
 
   def on_event(self, headers, message):
     """
-    Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is 
received from server.
+    Is triggered when an event to Constants.HOST_LEVEL_PARAMS_TOPIC topic is 
received from server.
 
     @param headers: headers dictionary
     @param message: message payload dictionary

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index 56962d3..c255e8b 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -109,8 +109,13 @@ from ambari_agent.ComponentStatusExecutor import 
ComponentStatusExecutor
 from ambari_agent.CommandStatusReporter import CommandStatusReporter
 from ambari_agent.HostStatusReporter import HostStatusReporter
 
+#logging.getLogger('ambari_agent').propagate = False
+
 logger = logging.getLogger()
-alerts_logger = logging.getLogger('ambari_alerts')
+alerts_logger = logging.getLogger('alerts')
+alerts_logger_global = logging.getLogger('ambari_agent.alerts')
+apscheduler_logger = logging.getLogger('apscheduler')
+apscheduler_logger_global = logging.getLogger('ambari_agent.apscheduler')
 
 formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s"
 agentPid = os.getpid()
@@ -131,6 +136,7 @@ SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING)
 _file_logging_handlers ={}
 
 def setup_logging(logger, filename, logging_level):
+  logger.propagate = False
   formatter = logging.Formatter(formatstr)
 
   if filename in _file_logging_handlers:
@@ -139,6 +145,7 @@ def setup_logging(logger, filename, logging_level):
     rotateLog = logging.handlers.RotatingFileHandler(filename, "a", 10000000, 
25)
     rotateLog.setFormatter(formatter)
     _file_logging_handlers[filename] = rotateLog
+  logger.handlers = []
   logger.addHandler(rotateLog)
 
   logging.basicConfig(format=formatstr, level=logging_level, filename=filename)
@@ -345,6 +352,8 @@ def reset_agent(options):
 MAX_RETRIES = 10
 
 def run_threads(initializer_module):
+  initializer_module.alert_scheduler_handler.start()
+
   heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
   heartbeat_thread.start()
 
@@ -387,6 +396,9 @@ def main(initializer_module, heartbeat_stop_callback=None):
   global is_logger_setup
   is_logger_setup = True
   setup_logging(alerts_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), 
logging_level)
+  setup_logging(alerts_logger_global, 
AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
+  setup_logging(apscheduler_logger, 
AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
+  setup_logging(apscheduler_logger_global, 
AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level)
   Logger.initialize_logger('resource_management', logging_level=logging_level)
 
   if home_dir != "":

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py 
b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 5db53c8..b7529a8 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -150,6 +150,12 @@ class BaseStompServerTestCase(unittest.TestCase):
 
     logging.getLogger('stomp.py').setLevel(logging.WARN)
     logging.getLogger('coilmq').setLevel(logging.INFO)
+    logging.getLogger('ambari_agent.apscheduler').setLevel(logging.WARN)
+    logging.getLogger('apscheduler').setLevel(logging.WARN)
+    logging.getLogger('ambari_agent.alerts').setLevel(logging.WARN)
+    logging.getLogger('alerts').setLevel(logging.WARN)
+    
logging.getLogger('ambari_agent.AlertSchedulerHandler').setLevel(logging.WARN)
+
 
   def remove_files(self, filepathes):
     for filepath in filepathes:

http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py 
b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 88f1e69..38d0e9e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -40,7 +40,7 @@ from mock.mock import MagicMock, patch
 @patch("ambari_agent.hostname.hostname", 
new=MagicMock(return_value="c6401.ambari.apache.org"))
 class TestAgentStompResponses(BaseStompServerTestCase):
   def setUp(self):
-    self.remove_files(['/tmp/cluster_cache/configurations.json', 
'/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', 
'/tmp/host_level_params.json'])
+    self.remove_files(['/tmp/cluster_cache/configurations.json', 
'/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', 
'/tmp/host_level_params.json', '/tmp/cluster_cache/alert_definitions.json'])
 
     if not os.path.exists("/tmp/ambari-agent"):
       os.mkdir("/tmp/ambari-agent")
@@ -60,6 +60,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     action_queue = initializer_module.action_queue
     action_queue.start()
+    initializer_module.alert_scheduler_handler.start()
 
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
@@ -86,10 +87,14 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '4'}, body=self.get_json("host_level_params.json"))
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body=self.get_json("alert_definitions.json"))
+    self.server.topic_manager.send(f)
+
     initial_topology_request = self.server.frames_queue.get()
     initial_metadata_request = self.server.frames_queue.get()
     initial_configs_request = self.server.frames_queue.get()
     initial_host_level_params_request = self.server.frames_queue.get()
+    initial_alert_definitions_request = self.server.frames_queue.get()
 
     while not initializer_module.is_registered:
       time.sleep(0.1)
@@ -108,6 +113,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     metadata_subscribe_frame = self.server.frames_queue.get()
     topologies_subscribe_frame = self.server.frames_queue.get()
     host_level_params_subscribe_frame = self.server.frames_queue.get()
+    alert_definitions_subscribe_frame = self.server.frames_queue.get()
     heartbeat_frame = self.server.frames_queue.get()
     dn_install_in_progress_frame = 
json.loads(self.server.frames_queue.get().body)
     dn_install_failed_frame = json.loads(self.server.frames_queue.get().body)
@@ -121,7 +127,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '6'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     command_status_reporter.join()
@@ -130,7 +136,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     host_status_reporter.join()
     action_queue.join()
 
-
     self.assertTrue('mounts' in host_status_report)
     
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'],
 'c6401.ambari.apache.org')
     
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'],
 ('STATUS',))
@@ -156,6 +161,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     action_queue = initializer_module.action_queue
     action_queue.start()
+    initializer_module.alert_scheduler_handler.start()
 
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
@@ -186,6 +192,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '4'}, body='{}')
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body='{}')
+    self.server.topic_manager.send(f)
+
     commands_subscribe_frame = self.server.frames_queue.get()
     configurations_subscribe_frame = self.server.frames_queue.get()
     metadata_subscribe_frame = self.server.frames_queue.get()
@@ -195,7 +204,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '6'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
@@ -232,10 +241,14 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '4'}, body='{}')
     self.server.topic_manager.send(f)
 
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body='{}')
+    self.server.topic_manager.send(f)
+
     initial_topology_request = self.server.frames_queue.get()
     initial_metadata_request = self.server.frames_queue.get()
     initial_configs_request = self.server.frames_queue.get()
     initial_host_level_params_request = self.server.frames_queue.get()
+    initial_alert_definitions_request = self.server.frames_queue.get()
 
     while not initializer_module.is_registered:
       time.sleep(0.1)
@@ -273,7 +286,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     initializer_module.stop_event.set()
 
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '5'}, body=json.dumps({'id':'1'}))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '6'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()

Reply via email to