This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 193dc1d8af65b8dff152a70f20b6033ea55ff8ed
Author: Andrew Onishuk <[email protected]>
AuthorDate: Tue Mar 27 15:30:27 2018 +0300

    AMBARI-23376. Create topic handler for small agent actions. Like 
restart_agent, clean_caches etc. (aonishuk)
---
 .../src/main/python/ambari_agent/Constants.py      |  3 +-
 .../main/python/ambari_agent/HeartbeatThread.py    |  8 +--
 .../ambari_agent/listeners/AgentActionsListener.py | 66 ++++++++++++++++++++++
 3 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 345f725..93c2a49 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -26,9 +26,10 @@ ALERTS_DEFINITIONS_TOPIC = '/user/alert_definitions'
 METADATA_TOPIC = '/events/metadata'
 TOPOLOGIES_TOPIC = '/events/topologies'
 SERVER_RESPONSES_TOPIC = '/user/'
+AGENT_ACTIONS_TOPIC = '/user/agent_actions'
 
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC]
-POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, 
METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, 
ALERTS_DEFINITIONS_TOPIC]
+POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, 
METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, 
ALERTS_DEFINITIONS_TOPIC, AGENT_ACTIONS_TOPIC]
 
 TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 5f91a77..6a7fbb7 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -30,6 +30,7 @@ from ambari_agent.Utils import Utils
 from ambari_agent.listeners.ServerResponsesListener import 
ServerResponsesListener
 from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import 
ConfigurationEventListener
+from ambari_agent.listeners.AgentActionsListener import AgentActionsListener
 from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
 from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
 from ambari_agent.listeners.HostLevelParamsEventListener import 
HostLevelParamsEventListener
@@ -64,7 +65,8 @@ class HeartbeatThread(threading.Thread):
     self.configuration_events_listener = 
ConfigurationEventListener(initializer_module.configurations_cache)
     self.host_level_params_events_listener = 
HostLevelParamsEventListener(initializer_module.host_level_params_cache, 
initializer_module.recovery_manager)
     self.alert_definitions_events_listener = 
AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, 
initializer_module.alert_scheduler_handler)
-    self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener, self.alert_definitions_events_listener]
+    self.agent_actions_events_listener = 
AgentActionsListener(initializer_module)
+    self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener, self.alert_definitions_events_listener, 
self.agent_actions_events_listener]
 
     self.post_registration_requests = [
     (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, 
self.topology_events_listener),
@@ -189,10 +191,6 @@ class HeartbeatThread(threading.Thread):
     else:
       self.responseId = serverId
 
-    if 'restartAgent' in response and response['restartAgent'].lower() == 
"true":
-      logger.warn("Restarting the agent by the request from server")
-      Utils.restartAgent(self.stop_event)
-
   def get_heartbeat_body(self):
     """
     Heartbeat body to be send to server
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py
new file mode 100644
index 0000000..a914954
--- /dev/null
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent.Utils import Utils
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class AgentActionsListener(EventListener):
+  """
+  Listener of Constants.AGENT_ACTIONS_TOPIC events from server.
+  """
+  ACTION_NAME = 'actionName'
+  RESTART_AGENT_ACTION = 'RESTART_AGENT'
+  
+  def __init(self, initializer_module):
+    self.initializer_module = initializer_module
+    self.stop_event = initializer_module.stop_event
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.AGENT_ACTIONS_TOPIC topic is 
received from server.
+    It contains some small actions which server can ask agent to do.
+
+    For bigger actions containing a lot of info and special workflow and a new 
topic would be
+    required. Small actions like restart_agent/clean_cache make sense to be in 
a general event
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    action_name = message[self.ACTION_NAME]
+
+    if action_name == self.RESTART_AGENT_ACTION:
+      self.restart_agent()
+    else:
+      logger.warn("Unknown action '{0}' requested by server. Ignoring 
it".format(action_name))
+
+  def restart_agent(self):
+    logger.warn("Restarting the agent by the request from server")
+    Utils.restartAgent(self.stop_event)
+
+  def get_handled_path(self):
+    return Constants.AGENT_ACTIONS_TOPIC
+
+

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to