This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit c1faa6f681e19f1f5dbd62427d369620525d7b9f Author: Myroslav Papirkovskyi <[email protected]> AuthorDate: Tue Mar 27 20:52:43 2018 +0300 AMBARI-23376. Create topic handler for small agent actions. Like restart_agent, clean_caches etc. (mpapirkovskyy) --- .../ambari_agent/listeners/AgentActionsListener.py | 2 +- .../ambari/server/agent/HeartBeatHandler.java | 32 +++++++------ .../controller/internal/HostResourceProvider.java | 5 +- .../ambari/server/events/AgentActionEvent.java | 55 ++++++++++++++++++++++ .../ambari/server/events/AmbariUpdateEvent.java | 3 +- .../server/events/DefaultMessageEmitter.java | 1 + 6 files changed, 80 insertions(+), 18 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py index a914954..07cf8ed 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py @@ -34,7 +34,7 @@ class AgentActionsListener(EventListener): ACTION_NAME = 'actionName' RESTART_AGENT_ACTION = 'RESTART_AGENT' - def __init(self, initializer_module): + def __init__(self, initializer_module): self.initializer_module = initializer_module self.stop_event = initializer_module.stop_event diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index f59cc1c..35ab8f0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -29,6 +29,7 @@ import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.events.AgentActionEvent; import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.apache.ambari.server.state.AgentVersion; import org.apache.ambari.server.state.Cluster; @@ -147,6 +148,20 @@ public class HeartBeatHandler { LOG.debug("Received heartbeat from host, hostname={}, currentResponseId={}, receivedResponseId={}", hostname, currentResponseId, heartbeat.getResponseId()); + response = new HeartBeatResponse(); + Host hostObject; + try { + hostObject = clusterFsm.getHost(hostname); + } catch (HostNotFoundException e) { + LOG.error("Host: {} not found. Agent is still heartbeating.", hostname); + if (LOG.isDebugEnabled()) { + LOG.debug("Host associated with the agent heratbeat might have been " + + "deleted", e); + } + // For now return empty response with only response id. + return response; + } + if (heartbeat.getResponseId() == currentResponseId - 1) { HeartBeatResponse heartBeatResponse = hostResponses.get(hostname); @@ -164,26 +179,13 @@ public class HeartBeatHandler { return createRestartCommand(currentResponseId); } - - response = new HeartBeatResponse(); response.setResponseId(++currentResponseId); - Host hostObject; - try { - hostObject = clusterFsm.getHost(hostname); - } catch (HostNotFoundException e) { - LOG.error("Host: {} not found. Agent is still heartbeating.", hostname); - if (LOG.isDebugEnabled()) { - LOG.debug("Host associated with the agent heratbeat might have been " + - "deleted", e); - } - // For now return empty response with only response id. - return response; - } - if (hostObject.getState().equals(HostState.HEARTBEAT_LOST)) { // After loosing heartbeat agent should reregister LOG.warn("Host {} is in HEARTBEAT_LOST state - sending register command", hostname); + stateUpdateEventPublisher.publish(new AgentActionEvent(AgentActionEvent.AgentAction.RESTART_AGENT, + hostObject.getHostId())); return createRegisterCommand(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java index 7583230..b07e834 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java @@ -565,7 +565,10 @@ public class HostResourceProvider extends AbstractControllerResourceProvider { addedHost.getRackInfo(), addedHost.getIPv4())); HostLevelParamsUpdateEvent hostLevelParamsUpdateEvent = new HostLevelParamsUpdateEvent(clusterId, new HostLevelParamsCluster( - getManagementController().retrieveHostRepositories(cl, addedHost), null)); + getManagementController().retrieveHostRepositories(cl, addedHost), + recoveryConfigHelper.getRecoveryConfig(cl.getClusterName(), + addedHost.getHostName()) + )); hostLevelParamsUpdateEvent.setHostId(addedHost.getHostId()); hostLevelParamsUpdateEvents.add(hostLevelParamsUpdateEvent); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AgentActionEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentActionEvent.java new file mode 100644 index 0000000..5a5d17b --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AgentActionEvent.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Event to send action commands to agent. + */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) +public class AgentActionEvent extends AmbariHostUpdateEvent { + + /** + * Host id with agent action commands will be send to. + */ + private Long hostId; + + @JsonProperty("actionName") + private AgentAction agentAction; + + public AgentActionEvent(AgentAction agentAction, Long hostId) { + super(Type.AGENT_ACTIONS); + this.agentAction = agentAction; + this.hostId = hostId; + } + + public void setHostId(Long hostId) { + this.hostId = hostId; + } + + @Override + public Long getHostId() { + return hostId; + } + + public enum AgentAction { + RESTART_AGENT + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java index 58dcecf..644008d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java @@ -60,7 +60,8 @@ public abstract class AmbariUpdateEvent { UI_ALERT_DEFINITIONS("events.alert_definitions"), ALERT_DEFINITIONS("alert_definitions"), UPGRADE("events.upgrade"), - COMMAND("events.commands"); + COMMAND("events.commands"), + AGENT_ACTIONS("events.agentactions"); /** * Is used to collect info about event appearing frequency. diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java index 88ebc86..afdc4eb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java @@ -45,6 +45,7 @@ public class DefaultMessageEmitter extends MessageEmitter { put(AmbariUpdateEvent.Type.ALERT_DEFINITIONS, "/alert_definitions"); put(AmbariUpdateEvent.Type.UI_ALERT_DEFINITIONS, "/events/alert_definitions"); put(AmbariUpdateEvent.Type.UPGRADE, "/events/upgrade"); + put(AmbariUpdateEvent.Type.AGENT_ACTIONS, "/agent_actions"); }}); public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) { -- To stop receiving notification emails like this one, please contact [email protected].
