This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 193dc1d8af65b8dff152a70f20b6033ea55ff8ed Author: Andrew Onishuk <[email protected]> AuthorDate: Tue Mar 27 15:30:27 2018 +0300 AMBARI-23376. Create topic handler for small agent actions. Like restart_agent, clean_caches etc. (aonishuk) --- .../src/main/python/ambari_agent/Constants.py | 3 +- .../main/python/ambari_agent/HeartbeatThread.py | 8 +-- .../ambari_agent/listeners/AgentActionsListener.py | 66 ++++++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 345f725..93c2a49 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -26,9 +26,10 @@ ALERTS_DEFINITIONS_TOPIC = '/user/alert_definitions' METADATA_TOPIC = '/events/metadata' TOPOLOGIES_TOPIC = '/events/topologies' SERVER_RESPONSES_TOPIC = '/user/' +AGENT_ACTIONS_TOPIC = '/user/agent_actions' PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC] -POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, ALERTS_DEFINITIONS_TOPIC] +POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, ALERTS_DEFINITIONS_TOPIC, AGENT_ACTIONS_TOPIC] TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies' METADATA_REQUEST_ENDPOINT = '/agents/metadata' diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 5f91a77..6a7fbb7 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -30,6 +30,7 @@ from ambari_agent.Utils import Utils from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener from ambari_agent.listeners.TopologyEventListener import TopologyEventListener from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener +from ambari_agent.listeners.AgentActionsListener import AgentActionsListener from ambari_agent.listeners.MetadataEventListener import MetadataEventListener from ambari_agent.listeners.CommandsEventListener import CommandsEventListener from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener @@ -64,7 +65,8 @@ class HeartbeatThread(threading.Thread): self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager) self.alert_definitions_events_listener = AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, initializer_module.alert_scheduler_handler) - self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener, self.alert_definitions_events_listener] + self.agent_actions_events_listener = AgentActionsListener(initializer_module) + self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener, self.alert_definitions_events_listener, self.agent_actions_events_listener] self.post_registration_requests = [ (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener), @@ -189,10 +191,6 @@ class HeartbeatThread(threading.Thread): else: self.responseId = serverId - if 'restartAgent' in response and response['restartAgent'].lower() == "true": - logger.warn("Restarting the agent by the request from server") - Utils.restartAgent(self.stop_event) - def get_heartbeat_body(self): """ Heartbeat body to be send to server diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py new file mode 100644 index 0000000..a914954 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import ambari_stomp + +from ambari_agent.listeners import EventListener +from ambari_agent.Utils import Utils +from ambari_agent import Constants + +logger = logging.getLogger(__name__) + +class AgentActionsListener(EventListener): + """ + Listener of Constants.AGENT_ACTIONS_TOPIC events from server. + """ + ACTION_NAME = 'actionName' + RESTART_AGENT_ACTION = 'RESTART_AGENT' + + def __init(self, initializer_module): + self.initializer_module = initializer_module + self.stop_event = initializer_module.stop_event + + def on_event(self, headers, message): + """ + Is triggered when an event to Constants.AGENT_ACTIONS_TOPIC topic is received from server. + It contains some small actions which server can ask agent to do. + + For bigger actions containing a lot of info and special workflow and a new topic would be + required. Small actions like restart_agent/clean_cache make sense to be in a general event + + @param headers: headers dictionary + @param message: message payload dictionary + """ + action_name = message[self.ACTION_NAME] + + if action_name == self.RESTART_AGENT_ACTION: + self.restart_agent() + else: + logger.warn("Unknown action '{0}' requested by server. Ignoring it".format(action_name)) + + def restart_agent(self): + logger.warn("Restarting the agent by the request from server") + Utils.restartAgent(self.stop_event) + + def get_handled_path(self): + return Constants.AGENT_ACTIONS_TOPIC + + -- To stop receiving notification emails like this one, please contact [email protected].
