AMBARI-21591. Send execution commands to agents user topic via STOMP. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/44c1cb51 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/44c1cb51 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/44c1cb51 Branch: refs/heads/branch-3.0-perf Commit: 44c1cb51236886bbb968946b6718cbac64d1a984 Parents: db83ccd Author: Myroslav Papirkovskyi <[email protected]> Authored: Thu Aug 3 19:06:41 2017 +0300 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Thu Aug 3 19:06:41 2017 +0300 ---------------------------------------------------------------------- .../actionmanager/ActionDBAccessorImpl.java | 4 +- .../server/actionmanager/ActionScheduler.java | 12 +- .../apache/ambari/server/agent/ActionQueue.java | 15 +- .../server/agent/AgentSessionManager.java | 19 +- .../ambari/server/agent/CancelCommand.java | 20 ++ .../ambari/server/agent/CommandReport.java | 44 +-- .../ambari/server/agent/ComponentStatus.java | 40 ++- .../ambari/server/agent/ExecutionCommand.java | 47 +-- .../ambari/server/agent/HeartBeatHandler.java | 202 ++----------- .../ambari/server/agent/HeartBeatResponse.java | 32 ++- .../ambari/server/agent/HeartbeatMonitor.java | 12 - .../ambari/server/agent/HeartbeatProcessor.java | 138 +++++---- .../ambari/server/agent/RecoveryConfig.java | 23 -- .../server/agent/RecoveryConfigHelper.java | 1 - .../server/agent/RegistrationResponse.java | 15 +- .../agent/stomp/AgentClusterDataHolder.java | 69 +---- .../server/agent/stomp/AgentConfigsHolder.java | 77 +++++ .../agent/stomp/AgentCurrentDataController.java | 22 +- .../server/agent/stomp/AgentDataHolder.java | 58 ++++ .../server/agent/stomp/AgentHostDataHolder.java | 73 +++++ .../agent/stomp/AgentReportsController.java | 28 +- .../agent/stomp/HostLevelParamsHolder.java | 83 ++++++ .../server/agent/stomp/MetadataHolder.java | 36 ++- .../server/agent/stomp/TopologyHolder.java | 48 +++- .../server/agent/stomp/dto/ClusterConfigs.java | 19 ++ .../agent/stomp/dto/CommandStatusReports.java | 43 +++ .../agent/stomp/dto/ComponentStatusReports.java | 2 +- .../stomp/dto/ExecutionCommandsCluster.java | 76 +++++ .../ambari/server/agent/stomp/dto/Hash.java | 6 +- .../agent/stomp/dto/HostLevelParamsCluster.java | 53 ++++ .../agent/stomp/dto/HostStatusReport.java | 54 ++++ .../server/agent/stomp/dto/MetadataCluster.java | 50 +++- .../agent/stomp/dto/MetadataServiceInfo.java | 30 +- .../server/agent/stomp/dto/TopologyCluster.java | 51 +++- .../agent/stomp/dto/TopologyComponent.java | 57 +++- .../server/agent/stomp/dto/TopologyHost.java | 40 +-- .../api/query/render/AlertStateSummary.java | 22 ++ .../api/query/render/AlertStateValues.java | 22 ++ .../render/AlertSummaryGroupedRenderer.java | 20 ++ .../server/api/services/AmbariMetaInfo.java | 141 +++++++++ .../server/configuration/Configuration.java | 14 + .../configuration/spring/AgentStompConfig.java | 18 ++ .../configuration/spring/ApiStompConfig.java | 3 +- .../configuration/spring/RootStompConfig.java | 8 + .../AmbariCustomCommandExecutionHelper.java | 145 +--------- .../AmbariManagementControllerImpl.java | 213 ++++++++++---- .../AlertDefinitionResourceProvider.java | 3 +- .../internal/ClientConfigResourceProvider.java | 2 +- .../ClusterStackVersionResourceProvider.java | 9 + .../DeleteHostComponentStatusMetaData.java | 6 +- .../internal/HostResourceProvider.java | 34 ++- .../server/events/AgentConfigsUpdateEvent.java | 50 +++- .../events/AlertDefinitionUpdateHolder.java | 116 ++++++++ .../events/AlertDefinitionsUpdateEvent.java | 284 +++++++++++++++++++ .../ambari/server/events/AlertUpdateEvent.java | 35 ++- .../ambari/server/events/AmbariEvent.java | 17 +- .../server/events/AmbariHostUpdateEvent.java | 37 +++ .../ambari/server/events/AmbariUpdateEvent.java | 31 +- .../ambari/server/events/CommandEvent.java | 24 -- .../server/events/ConfigsUpdateEvent.java | 86 +++++- .../server/events/ExecutionCommandEvent.java | 83 ++++++ .../server/events/HostComponentUpdate.java | 127 +++++++++ .../server/events/HostComponentUpdateEvent.java | 90 ------ .../events/HostComponentsUpdateEvent.java | 61 ++++ .../events/HostLevelParamsUpdateEvent.java | 108 +++++++ .../server/events/HostStateUpdateEvent.java | 49 ++++ .../server/events/HostStatusUpdateEvent.java | 47 +++ .../ambari/server/events/HostUpdateEvent.java | 172 +++++++++++ .../server/events/MetadataUpdateEvent.java | 45 ++- .../events/NamedHostRoleCommandUpdateEvent.java | 41 ++- .../server/events/RequestUpdateEvent.java | 84 +++++- .../server/events/ServiceUpdateEvent.java | 106 +++++++ .../server/events/TopologyAgentUpdateEvent.java | 35 +++ .../server/events/TopologyUpdateEvent.java | 57 +++- .../listeners/alerts/AlertReceivedListener.java | 16 +- .../listeners/hosts/HostUpdateListener.java | 220 ++++++++++++++ .../listeners/requests/StateUpdateListener.java | 37 ++- .../services/ServiceUpdateListener.java | 99 +++++++ .../listeners/tasks/TaskStatusListener.java | 16 +- .../listeners/upgrade/StackVersionListener.java | 2 +- .../publishers/AgentCommandsPublisher.java | 245 ++++++++++++++++ .../HostComponentUpdateEventPublisher.java | 82 ++++++ .../publishers/RequestUpdateEventPublisher.java | 116 ++++++++ .../publishers/StateUpdateEventPublisher.java | 17 +- .../server/orm/dao/AlertDefinitionDAO.java | 17 ++ .../ambari/server/orm/dao/AlertSummaryDTO.java | 41 +++ .../serveraction/AbstractServerAction.java | 4 +- .../ambari/server/state/ConfigHelper.java | 34 ++- .../org/apache/ambari/server/state/Service.java | 3 + .../ambari/server/state/ServiceComponent.java | 3 + .../server/state/ServiceComponentHost.java | 2 +- .../server/state/ServiceComponentImpl.java | 11 + .../apache/ambari/server/state/ServiceImpl.java | 20 +- .../server/state/alert/AlertDefinitionHash.java | 3 +- .../ambari/server/state/alert/AlertUri.java | 6 +- .../server/state/alert/ParameterizedSource.java | 24 ++ .../ambari/server/state/alert/Reporting.java | 2 + .../ambari/server/state/alert/Source.java | 2 + .../server/state/cluster/ClusterImpl.java | 6 +- .../server/state/cluster/ClustersImpl.java | 19 +- .../ambari/server/state/host/HostImpl.java | 10 + .../svccomphost/ServiceComponentHostImpl.java | 30 +- .../server/topology/TopologyDeleteFormer.java | 21 +- .../server/upgrade/AbstractUpgradeCatalog.java | 11 + .../actionmanager/TestActionScheduler.java | 6 +- .../server/agent/HeartbeatProcessorTest.java | 68 ++--- .../ambari/server/agent/TestActionQueue.java | 7 +- .../server/agent/TestHeartbeatHandler.java | 47 ++- .../server/agent/TestHeartbeatMonitor.java | 9 +- .../configuration/RecoveryConfigHelperTest.java | 30 -- .../upgrades/UpgradeActionTest.java | 21 +- .../state/alerts/AlertDefinitionHashTest.java | 4 +- 112 files changed, 4300 insertions(+), 1055 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index d5ae5b5..e72654d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -550,8 +550,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { if (!existingTaskStatus.isCompletedState()) { commandEntity.setStatus(reportedTaskStatus); } - commandEntity.setStdOut(report.getStdOut().getBytes()); - commandEntity.setStdError(report.getStdErr().getBytes()); + commandEntity.setStdOut(report.getStdOut() == null ? null : report.getStdOut().getBytes()); + commandEntity.setStdError(report.getStdErr() == null ? null : report.getStdErr().getBytes()); commandEntity.setStructuredOut(report.getStructuredOut() == null ? null : report.getStructuredOut().getBytes()); commandEntity.setExitcode(report.getExitCode()); http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 316f2bd..f9daf8d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -50,6 +50,7 @@ import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.events.ActionFinalReportReceivedEvent; import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent; import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; +import org.apache.ambari.server.events.publishers.AgentCommandsPublisher; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.events.publishers.JPAEventPublisher; import org.apache.ambari.server.metadata.RoleCommandOrder; @@ -144,6 +145,9 @@ class ActionScheduler implements Runnable { @Inject private HostRoleCommandDAO hostRoleCommandDAO; + @Inject + private AgentCommandsPublisher agentCommandsPublisher; + /** * The current thread's reference to the {@link EntityManager}. */ @@ -527,7 +531,7 @@ class ActionScheduler implements Runnable { commandsToEnqueue.put(cmd.getHostname(), cmd); } } - actionQueue.enqueueAll(commandsToEnqueue.asMap()); + agentCommandsPublisher.sendAgentCommand(commandsToEnqueue); LOG.debug("==> Finished."); if (!configuration.getParallelStageExecution()) { // If disabled @@ -1177,7 +1181,7 @@ class ActionScheduler implements Runnable { /** * Aborts all stages that belong to requests that are being cancelled */ - private void processCancelledRequestsList() { + private void processCancelledRequestsList() throws AmbariException { synchronized (requestsToBeCancelled) { // Now, cancel stages completely for (Long requestId : requestsToBeCancelled) { @@ -1221,7 +1225,7 @@ class ActionScheduler implements Runnable { * @param hostRoleCommands a list of hostRoleCommands * @param reason why the request is being cancelled */ - void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) { + void cancelHostRoleCommands(Collection<HostRoleCommand> hostRoleCommands, String reason) throws AmbariException { for (HostRoleCommand hostRoleCommand : hostRoleCommands) { // There are no server actions in actionQueue if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) { @@ -1236,7 +1240,7 @@ class ActionScheduler implements Runnable { CancelCommand cancelCommand = new CancelCommand(); cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId()); cancelCommand.setReason(reason); - actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand); + agentCommandsPublisher.sendAgentCommand(hostRoleCommand.getHostName(), cancelCommand); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java index 8d05e4d..9c2df47 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java @@ -18,12 +18,10 @@ package org.apache.ambari.server.agent; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -36,6 +34,7 @@ import org.slf4j.LoggerFactory; import com.google.inject.Singleton; @Singleton +//TODO remove this entity in future public class ActionQueue { private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class); @@ -60,11 +59,11 @@ public class ActionQueue { * @param cmd - command to add to queue * @throws NullPointerException - if hostname is {@code}null{@code} */ - public void enqueue(String hostname, AgentCommand cmd) { + /*public void enqueue(String hostname, AgentCommand cmd) { Queue<AgentCommand> q = getHostQueue(hostname); q.add(cmd); - } + }*/ /** * Adds commands to queue (atomically) for given hostname @@ -72,11 +71,11 @@ public class ActionQueue { * @param commands - list of commands to add to queue * @throws NullPointerException - if hostname is {@code}null{@code} */ - public void enqueue(String hostname, Collection<AgentCommand> commands) { + /*public void enqueue(String hostname, Collection<AgentCommand> commands) { Queue<AgentCommand> q = getHostQueue(hostname); q.addAll(commands); - } + }*/ private Queue<AgentCommand> getHostQueue(String hostname) { Queue<AgentCommand> q = getQueue(hostname); @@ -97,11 +96,11 @@ public class ActionQueue { * Adds command map to queue * @param commandMap - map with hostname as key and command list as value */ - public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) { + /*public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) { for (Map.Entry<String, Collection<AgentCommand>> entry : commandMap.entrySet()) { enqueue(entry.getKey(), entry.getValue()); } - } + }*/ /** * Get command from queue for given hostname http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java index 496a697..1f64fc1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java @@ -27,7 +27,8 @@ import com.google.inject.Singleton; @Singleton public class AgentSessionManager { - private static ConcurrentHashMap<String, Host> registeredHosts = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, Host> registeredHosts = new ConcurrentHashMap<>(); + private ConcurrentHashMap<String, String> registeredSessionIds = new ConcurrentHashMap<>(); public void register(String sessionId, Host registeredHost) { String existKey = registeredHosts.entrySet().stream() @@ -37,6 +38,7 @@ public class AgentSessionManager { registeredHosts.remove(existKey); } registeredHosts.put(sessionId, registeredHost); + registeredSessionIds.put(registeredHost.getHostName(), sessionId); } public boolean isRegistered(String sessionId) { @@ -50,5 +52,20 @@ public class AgentSessionManager { throw new HostNotRegisteredException(sessionId); } + public String getSessionId(String hostName) throws HostNotRegisteredException { + if (registeredSessionIds.containsKey(hostName)) { + return registeredSessionIds.get(hostName); + } + throw new HostNotRegisteredException(hostName); + } + public void unregisterByHost(String hostName) { + String existKey = registeredHosts.entrySet().stream() + .filter(e -> e.getValue().getHostName().equals(hostName)).map(Map.Entry::getKey) + .findAny().orElse(null); + if (existKey != null) { + registeredHosts.remove(existKey); + registeredSessionIds.remove(hostName); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/CancelCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/CancelCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CancelCommand.java index ea4ccc2..f72e43b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/CancelCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CancelCommand.java @@ -17,11 +17,13 @@ */ package org.apache.ambari.server.agent; +import com.fasterxml.jackson.annotation.JsonInclude; import com.google.gson.annotations.SerializedName; /** * Command to report the status of a list of services in roles. */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) public class CancelCommand extends AgentCommand { public CancelCommand() { @@ -51,4 +53,22 @@ public class CancelCommand extends AgentCommand { public void setReason(String reason) { this.reason = reason; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CancelCommand that = (CancelCommand) o; + + if (targetTaskId != that.targetTaskId) return false; + return reason != null ? reason.equals(that.reason) : that.reason == null; + } + + @Override + public int hashCode() { + int result = (int) (targetTaskId ^ (targetTaskId >>> 32)); + result = 31 * result + (reason != null ? reason.hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java index bca562a..3ab469c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java @@ -17,11 +17,8 @@ */ package org.apache.ambari.server.agent; -import java.util.Map; - import org.codehaus.jackson.annotate.JsonProperty; - public class CommandReport { private String role; @@ -31,12 +28,11 @@ public class CommandReport { private String structuredOut; private String status; int exitCode; - private String clusterName; private String serviceName; private long taskId; + private String clusterId; private String roleCommand; private String customCommand; - private Map<String, Map<String, String>> configurationTags; @JsonProperty("customCommand") @com.fasterxml.jackson.annotation.JsonProperty("customCommand") @@ -61,18 +57,6 @@ public class CommandReport { public void setTaskId(long taskId) { this.taskId = taskId; } - - @JsonProperty("clusterName") - @com.fasterxml.jackson.annotation.JsonProperty("clusterName") - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - @JsonProperty("clusterName") - @com.fasterxml.jackson.annotation.JsonProperty("clusterName") - public String getClusterName() { - return this.clusterName; - } @JsonProperty("actionId") @com.fasterxml.jackson.annotation.JsonProperty("actionId") @@ -183,23 +167,14 @@ public class CommandReport { this.serviceName = serviceName; } - /** - * @param tags the config tags that match this command - */ - @JsonProperty("configurationTags") - @com.fasterxml.jackson.annotation.JsonProperty("configurationTags") - public void setConfigurationTags(Map<String, Map<String,String>> tags) { - configurationTags = tags; + @com.fasterxml.jackson.annotation.JsonProperty("clusterId") + public String getClusterId() { + return clusterId; } - - /** - * @return the config tags that match this command, or <code>null</code> - * if none are present - */ - @JsonProperty("configurationTags") - @com.fasterxml.jackson.annotation.JsonProperty("configurationTags") - public Map<String, Map<String,String>> getConfigurationTags() { - return configurationTags; + + @com.fasterxml.jackson.annotation.JsonProperty("clusterId") + public void setClusterId(String clusterId) { + this.clusterId = clusterId; } @Override @@ -209,11 +184,10 @@ public class CommandReport { ", actionId='" + actionId + '\'' + ", status='" + status + '\'' + ", exitCode=" + exitCode + - ", clusterName='" + clusterName + '\'' + + ", clusterId='" + clusterId + '\'' + ", serviceName='" + serviceName + '\'' + ", taskId=" + taskId + ", roleCommand=" + roleCommand + - ", configurationTags=" + configurationTags + ", customCommand=" + customCommand + '}'; } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java index 68e1734..74e134c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java @@ -28,10 +28,16 @@ public class ComponentStatus { private String msg; private String status; + /** + * A String declaring the component's security state + * + * @see org.apache.ambari.server.state.SecurityState + */ + private String securityState; private String sendExecCmdDet = "False"; private String serviceName; - private String clusterName; + private Long clusterId; private String stackVersion; private Map<String, Map<String, String>> configurationTags; private Map<String, Object> extra; @@ -68,6 +74,26 @@ public class ComponentStatus { this.status = status; } + /** + * Gets the relevant component's security state. + * + * @return a String declaring this component's security state + * @see org.apache.ambari.server.state.SecurityState + */ + public String getSecurityState() { + return securityState; + } + + /** + * Sets the relevant component's security state. + * + * @param securityState a String declaring this component's security state + * @see org.apache.ambari.server.state.SecurityState + */ + public void setSecurityState(String securityState) { + this.securityState = securityState; + } + public String getStackVersion() { return stackVersion; } @@ -92,12 +118,12 @@ public class ComponentStatus { this.serviceName = serviceName; } - public String getClusterName() { - return clusterName; + public Long getClusterId() { + return clusterId; } - public void setClusterName(String clusterName) { - this.clusterName = clusterName; + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; } /** @@ -132,8 +158,8 @@ public class ComponentStatus { @Override public String toString() { return "ComponentStatus [componentName=" + componentName + ", msg=" + msg - + ", status=" + status - + ", serviceName=" + serviceName + ", clusterName=" + clusterName + + ", status=" + status + ", securityState=" + securityState + + ", serviceName=" + serviceName + ", clusterId=" + clusterId + ", stackVersion=" + stackVersion + ", configurationTags=" + configurationTags + ", extra=" + extra + "]"; } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java index c2579a9..c6e569b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java @@ -30,6 +30,8 @@ import org.apache.ambari.server.utils.StageUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.google.gson.annotations.SerializedName; @@ -37,6 +39,7 @@ import com.google.gson.annotations.SerializedName; * Execution commands are scheduled by action manager, and these are * persisted in the database for recovery. */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) public class ExecutionCommand extends AgentCommand { private static Log LOG = LogFactory.getLog(ExecutionCommand.class); @@ -45,6 +48,9 @@ public class ExecutionCommand extends AgentCommand { super(AgentCommandType.EXECUTION_COMMAND); } + @com.fasterxml.jackson.annotation.JsonProperty("clusterId") + private String clusterId; + @SerializedName("clusterName") @com.fasterxml.jackson.annotation.JsonProperty("clusterName") private String clusterName; @@ -54,7 +60,7 @@ public class ExecutionCommand extends AgentCommand { private long requestId; @SerializedName("stageId") - @com.fasterxml.jackson.annotation.JsonProperty("stageId") + @JsonIgnore private long stageId; @SerializedName("taskId") @@ -66,7 +72,7 @@ public class ExecutionCommand extends AgentCommand { private String commandId; @SerializedName("hostname") - @com.fasterxml.jackson.annotation.JsonProperty("hostname") + @JsonIgnore private String hostname; @SerializedName("role") @@ -74,7 +80,7 @@ public class ExecutionCommand extends AgentCommand { private String role; @SerializedName("hostLevelParams") - @com.fasterxml.jackson.annotation.JsonProperty("hostLevelParams") + @JsonIgnore private Map<String, String> hostLevelParams = new HashMap<>(); @SerializedName("roleParams") @@ -86,24 +92,24 @@ public class ExecutionCommand extends AgentCommand { private RoleCommand roleCommand; @SerializedName("clusterHostInfo") - @com.fasterxml.jackson.annotation.JsonProperty("clusterHostInfo") + @JsonIgnore private Map<String, Set<String>> clusterHostInfo = new HashMap<>(); @SerializedName("configurations") - @com.fasterxml.jackson.annotation.JsonProperty("configurations") + @JsonIgnore private Map<String, Map<String, String>> configurations; @SerializedName("configuration_attributes") - @com.fasterxml.jackson.annotation.JsonProperty("configuration_attributes") + @JsonIgnore private Map<String, Map<String, Map<String, String>>> configurationAttributes; @SerializedName("configurationTags") - @com.fasterxml.jackson.annotation.JsonProperty("configurationTags") + @JsonIgnore private Map<String, Map<String, String>> configurationTags; @SerializedName("forceRefreshConfigTagsBeforeExecution") - @com.fasterxml.jackson.annotation.JsonProperty("forceRefreshConfigTagsBeforeExecution") + @JsonIgnore private boolean forceRefreshConfigTagsBeforeExecution = false; @SerializedName("commandParams") @@ -115,23 +121,23 @@ public class ExecutionCommand extends AgentCommand { private String serviceName; @SerializedName("serviceType") - @com.fasterxml.jackson.annotation.JsonProperty("serviceType") + @JsonIgnore private String serviceType; @SerializedName("componentName") - @com.fasterxml.jackson.annotation.JsonProperty("componentName") + @JsonIgnore private String componentName; @SerializedName("kerberosCommandParams") - @com.fasterxml.jackson.annotation.JsonProperty("kerberosCommandParams") + @JsonIgnore private List<Map<String, String>> kerberosCommandParams = new ArrayList<>(); @SerializedName("localComponents") - @com.fasterxml.jackson.annotation.JsonProperty("localComponents") + @JsonIgnore private Set<String> localComponents = new HashSet<>(); @SerializedName("availableServices") - @com.fasterxml.jackson.annotation.JsonProperty("availableServices") + @JsonIgnore private Map<String, String> availableServices = new HashMap<>(); /** @@ -139,7 +145,7 @@ public class ExecutionCommand extends AgentCommand { * service is enabled for credential store use. */ @SerializedName("credentialStoreEnabled") - @com.fasterxml.jackson.annotation.JsonProperty("credentialStoreEnabled") + @JsonIgnore private String credentialStoreEnabled; /** @@ -165,7 +171,7 @@ public class ExecutionCommand extends AgentCommand { * </pre> */ @SerializedName("configuration_credentials") - @com.fasterxml.jackson.annotation.JsonProperty("configuration_credentials") + @JsonIgnore private Map<String, Map<String, String>> configurationCredentials; public void setConfigurationCredentials(Map<String, Map<String, String>> configurationCredentials) { @@ -418,11 +424,19 @@ public class ExecutionCommand extends AgentCommand { kerberosCommandParams = params; } + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + /** * Contains key name strings. These strings are used inside maps * incapsulated inside command. */ - public interface KeyNames { + public static interface KeyNames { String COMMAND_TIMEOUT = "command_timeout"; String SCRIPT = "script"; String SCRIPT_TYPE = "script_type"; @@ -492,5 +506,6 @@ public class ExecutionCommand extends AgentCommand { * can be looked up and possibly have its version updated. */ String REPO_VERSION_ID = "repository_version_id"; + String CLUSTER_NAME = "cluster_name"; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index e890761..fcf32f0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -17,10 +17,6 @@ */ package org.apache.ambari.server.agent; -import java.io.BufferedInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -32,11 +28,10 @@ import java.util.regex.Pattern; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.HostNotFoundException; import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.agent.stomp.dto.HostStatusReport; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; -import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReader; -import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReaderFactory; -import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction; +import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.apache.ambari.server.state.AgentVersion; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; @@ -54,11 +49,7 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent; import org.apache.ambari.server.state.host.HostRegistrationRequestEvent; import org.apache.ambari.server.state.host.HostStatusUpdatesReceivedEvent; -import org.apache.ambari.server.utils.StageUtils; import org.apache.ambari.server.utils.VersionUtils; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,11 +94,11 @@ public class HeartBeatHandler { @Inject private RecoveryConfigHelper recoveryConfigHelper; - /** - * KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances - */ @Inject - private KerberosIdentityDataFileReaderFactory kerberosIdentityDataFileReaderFactory; + private StateUpdateEventPublisher stateUpdateEventPublisher; + + @Inject + private AgentSessionManager agentSessionManager; private Map<String, Long> hostResponseIds = new ConcurrentHashMap<>(); @@ -196,17 +187,6 @@ public class HeartBeatHandler { hostResponseIds.put(hostname, currentResponseId); hostResponses.put(hostname, response); - // If the host is waiting for component status updates, notify it - if (heartbeat.componentStatus.size() > 0 - && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) { - try { - LOG.debug("Got component status updates"); - hostObject.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); - } catch (InvalidStateTransitionException e) { - LOG.warn("Failed to notify the host about component status updates", e); - } - } - if (heartbeat.getRecoveryReport() != null) { RecoveryReport rr = heartbeat.getRecoveryReport(); processRecoveryReport(rr, hostname); @@ -238,7 +218,7 @@ public class HeartBeatHandler { response.setRecoveryConfig(rc); if (response.getRecoveryConfig() != null) { - LOG.info("Recovery configuration set to {}", response.getRecoveryConfig().toString()); + LOG.debug("Recovery configuration set to {}", response.getRecoveryConfig().toString()); } } } @@ -247,7 +227,6 @@ public class HeartBeatHandler { // Send commands if node is active if (hostObject.getState().equals(HostState.HEALTHY)) { - sendCommands(hostname, response); annotateResponse(hostname, response); } @@ -256,6 +235,24 @@ public class HeartBeatHandler { public void handleComponentReportStatus(List<ComponentStatus> componentStatuses, String hostname) throws AmbariException { heartbeatProcessor.processStatusReports(componentStatuses, hostname); + heartbeatProcessor.processHostStatus(componentStatuses, null, hostname); + } + + public void handleCommandReportStatus(List<CommandReport> reports, String hostname) throws AmbariException { + heartbeatProcessor.processCommandReports(reports, hostname, System.currentTimeMillis()); + heartbeatProcessor.processHostStatus(null, reports, hostname); + } + + public void handleHostReportStatus(HostStatusReport hostStatusReport, String hostname) throws AmbariException { + Host host = clusterFsm.getHost(hostname); + try { + host.handleEvent(new HostHealthyHeartbeatEvent(hostname, System.currentTimeMillis(), + hostStatusReport.getAgentEnv(), hostStatusReport.getMounts())); + } catch (InvalidStateTransitionException ex) { + LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex); + host.setState(HostState.INIT); + agentSessionManager.unregisterByHost(hostname); + } } protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException { @@ -264,66 +261,6 @@ public class HeartBeatHandler { host.setRecoveryReport(recoveryReport); } - /** - * Adds commands from action queue to a heartbeat response. - */ - protected void sendCommands(String hostname, HeartBeatResponse response) - throws AmbariException { - List<AgentCommand> cmds = actionQueue.dequeueAll(hostname); - if (cmds != null && !cmds.isEmpty()) { - for (AgentCommand ac : cmds) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac)); - } - } catch (Exception e) { - throw new AmbariException("Could not get jaxb string for command", e); - } - switch (ac.getCommandType()) { - case BACKGROUND_EXECUTION_COMMAND: - case EXECUTION_COMMAND: { - ExecutionCommand ec = (ExecutionCommand)ac; - LOG.info("HeartBeatHandler.sendCommands: sending ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task ID {}", - ec.getHostname(), ec.getRole(), ec.getRoleCommand(), ec.getCommandId(), ec.getTaskId()); - Map<String, String> hlp = ec.getHostLevelParams(); - if (hlp != null) { - String customCommand = hlp.get("custom_command"); - if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) { - LOG.info(String.format("%s called", customCommand)); - try { - injectKeytab(ec, customCommand, hostname); - } catch (IOException e) { - throw new AmbariException("Could not inject keytab into command", e); - } - } - } - response.addExecutionCommand((ExecutionCommand) ac); - break; - } - case STATUS_COMMAND: { - response.addStatusCommand((StatusCommand) ac); - break; - } - case CANCEL_COMMAND: { - response.addCancelCommand((CancelCommand) ac); - break; - } - case ALERT_DEFINITION_COMMAND: { - response.addAlertDefinitionCommand((AlertDefinitionCommand) ac); - break; - } - case ALERT_EXECUTION_COMMAND: { - response.addAlertExecutionCommand((AlertExecutionCommand) ac); - break; - } - default: - LOG.error("There is no action for agent command =" - + ac.getCommandType().name()); - } - } - } - } - public String getOsType(String os, String osRelease) { String osType = ""; if (os != null) { @@ -571,95 +508,6 @@ public class HeartBeatHandler { return commands; } - /** - * Insert Kerberos keytab details into the ExecutionCommand for the SET_KEYTAB custom command if - * any keytab details and associated data exists for the target host. - * - * @param ec the ExecutionCommand to update - * @param command a name of the relevant keytab command - * @param targetHost a name of the host the relevant command is destined for - * @throws AmbariException - */ - void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException { - String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY); - - if(dataDir != null) { - KerberosIdentityDataFileReader reader = null; - List<Map<String, String>> kcp = ec.getKerberosCommandParams(); - - try { - reader = kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME)); - - for (Map<String, String> record : reader) { - String hostName = record.get(KerberosIdentityDataFileReader.HOSTNAME); - - if (targetHost.equalsIgnoreCase(hostName)) { - - if ("SET_KEYTAB".equalsIgnoreCase(command)) { - String keytabFilePath = record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH); - - if (keytabFilePath != null) { - - String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath); - File keytabFile = new File(dataDir + File.separator + hostName + File.separator + sha1Keytab); - - if (keytabFile.canRead()) { - Map<String, String> keytabMap = new HashMap<>(); - String principal = record.get(KerberosIdentityDataFileReader.PRINCIPAL); - String isService = record.get(KerberosIdentityDataFileReader.SERVICE); - - keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); - keytabMap.put(KerberosIdentityDataFileReader.SERVICE, isService); - keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); - keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME)); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS)); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME)); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS)); - - BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile)); - byte[] keytabContent = null; - try { - keytabContent = IOUtils.toByteArray(bufferedIn); - } finally { - bufferedIn.close(); - } - String keytabContentBase64 = Base64.encodeBase64String(keytabContent); - keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64); - - kcp.add(keytabMap); - } - } - } else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) { - Map<String, String> keytabMap = new HashMap<>(); - - keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); - keytabMap.put(KerberosIdentityDataFileReader.SERVICE, record.get(KerberosIdentityDataFileReader.SERVICE)); - keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); - keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, record.get(KerberosIdentityDataFileReader.PRINCIPAL)); - keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH)); - - kcp.add(keytabMap); - } - } - } - } catch (IOException e) { - throw new AmbariException("Could not inject keytabs to enable kerberos"); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (Throwable t) { - // ignored - } - } - } - - ec.setKerberosCommandParams(kcp); - } - } - public void stop() { heartbeatMonitor.shutdown(); heartbeatProcessor.stopAsync(); http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java index b46bac9..51978e9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java @@ -21,11 +21,14 @@ package org.apache.ambari.server.agent; import java.util.ArrayList; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; import com.google.gson.annotations.SerializedName; /** * Controller to Agent response data model. */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) public class HeartBeatResponse { @SerializedName("responseId") @@ -33,15 +36,15 @@ public class HeartBeatResponse { private long responseId; @SerializedName("executionCommands") - @com.fasterxml.jackson.annotation.JsonProperty("executionCommands") + @JsonIgnore private List<ExecutionCommand> executionCommands = new ArrayList<>(); @SerializedName("statusCommands") - @com.fasterxml.jackson.annotation.JsonProperty("statusCommands") + @JsonIgnore private List<StatusCommand> statusCommands = new ArrayList<>(); @SerializedName("cancelCommands") - @com.fasterxml.jackson.annotation.JsonProperty("cancelCommands") + @JsonIgnore private List<CancelCommand> cancelCommands = new ArrayList<>(); /** @@ -52,7 +55,7 @@ public class HeartBeatResponse { * the agent to abandon all alert definitions that are scheduled. */ @SerializedName("alertDefinitionCommands") - @com.fasterxml.jackson.annotation.JsonProperty("alertDefinitionCommands") + @JsonIgnore private List<AlertDefinitionCommand> alertDefinitionCommands = null; /** @@ -60,31 +63,31 @@ public class HeartBeatResponse { * immediately. */ @SerializedName("alertExecutionCommands") - @com.fasterxml.jackson.annotation.JsonProperty("alertExecutionCommands") + @JsonIgnore private List<AlertExecutionCommand> alertExecutionCommands = null; @SerializedName("registrationCommand") - @com.fasterxml.jackson.annotation.JsonProperty("registrationCommand") + @JsonIgnore private RegistrationCommand registrationCommand; @SerializedName("restartAgent") @com.fasterxml.jackson.annotation.JsonProperty("restartAgent") - private boolean restartAgent = false; + private Boolean restartAgent = null; @SerializedName("hasMappedComponents") - @com.fasterxml.jackson.annotation.JsonProperty("hasMappedComponents") + @JsonIgnore private boolean hasMappedComponents = false; @SerializedName("hasPendingTasks") - @com.fasterxml.jackson.annotation.JsonProperty("hasPendingTasks") + @JsonIgnore private boolean hasPendingTasks = false; @SerializedName("recoveryConfig") - @com.fasterxml.jackson.annotation.JsonProperty("recoveryConfig") + @JsonIgnore private RecoveryConfig recoveryConfig; @SerializedName("clusterSize") - @com.fasterxml.jackson.annotation.JsonProperty("clusterSize") + @JsonIgnore private int clusterSize = -1; public long getResponseId() { @@ -150,6 +153,9 @@ public class HeartBeatResponse { /** * Gets the alert definition commands that contain the alert definitions for * each cluster that the host is a member of. + * + * @param commands + * the commands, or {@code null} for none. */ public List<AlertDefinitionCommand> getAlertDefinitionCommands() { return alertDefinitionCommands; @@ -166,11 +172,11 @@ public class HeartBeatResponse { alertDefinitionCommands = commands; } - public boolean isRestartAgent() { + public Boolean isRestartAgent() { return restartAgent; } - public void setRestartAgent(boolean restartAgent) { + public void setRestartAgent(Boolean restartAgent) { this.restartAgent = restartAgent; } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java index a77ed75..6b6114c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java @@ -190,18 +190,6 @@ public class HeartbeatMonitor implements Runnable { hostObj.setState(HostState.INIT); } } - - // Get status of service components - List<StatusCommand> cmds = generateStatusCommands(hostname); - LOG.trace("Generated " + cmds.size() + " status commands for host: " + - hostname); - if (cmds.isEmpty()) { - // Nothing to do - } else { - for (StatusCommand command : cmds) { - actionQueue.enqueue(hostname, command); - } - } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java index 47109df..fce6be6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java @@ -55,13 +55,16 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.HostHealthStatus; +import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.SecurityState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.UpgradeState; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; +import org.apache.ambari.server.state.host.HostStatusUpdatesReceivedEvent; import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.stack.upgrade.Direction; import org.apache.ambari.server.state.stack.upgrade.UpgradeType; @@ -70,6 +73,7 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgre import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,28 +251,31 @@ public class HeartbeatProcessor extends AbstractService{ * @throws AmbariException */ protected void processHostStatus(HeartBeat heartbeat) throws AmbariException { + processHostStatus(heartbeat.getComponentStatus(), heartbeat.getReports(), heartbeat.getHostname()); + } - String hostname = heartbeat.getHostname(); - Host host = clusterFsm.getHost(hostname); + + protected void processHostStatus(List<ComponentStatus> componentStatuses, List<CommandReport> reports, + String hostName) throws AmbariException { + + Host host = clusterFsm.getHost(hostName); HostHealthStatus.HealthStatus healthStatus = host.getHealthStatus().getHealthStatus(); if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) { - List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus(); //Host status info could be calculated only if agent returned statuses in heartbeat //Or, if a command is executed that can change component status boolean calculateHostStatus = false; - String clusterName = null; - if (componentStatuses.size() > 0) { + Long clusterId = null; + if (CollectionUtils.isNotEmpty(componentStatuses)) { calculateHostStatus = true; for (ComponentStatus componentStatus : componentStatuses) { - clusterName = componentStatus.getClusterName(); + clusterId = componentStatus.getClusterId(); break; } } - if (!calculateHostStatus) { - List<CommandReport> reports = heartbeat.getReports(); + if (!calculateHostStatus && CollectionUtils.isNotEmpty(reports)) { for (CommandReport report : reports) { if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) { continue; @@ -280,7 +287,7 @@ public class HeartbeatProcessor extends AbstractService{ } if (report.getStatus().equals("COMPLETED")) { calculateHostStatus = true; - clusterName = report.getClusterName(); + clusterId = Long.parseLong(report.getClusterId()); break; } } @@ -294,11 +301,11 @@ public class HeartbeatProcessor extends AbstractService{ int slavesRunning = 0; StackId stackId; - Cluster cluster = clusterFsm.getCluster(clusterName); + Cluster cluster = clusterFsm.getCluster(clusterId); stackId = cluster.getDesiredStackVersion(); - List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname()); + List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(hostName); for (ServiceComponentHost scHost : scHosts) { ComponentInfo componentInfo = ambariMetaInfo.getComponent(stackId.getStackName(), @@ -349,11 +356,12 @@ public class HeartbeatProcessor extends AbstractService{ * @param now cached current time * @throws AmbariException */ - protected void processCommandReports( - HeartBeat heartbeat, long now) + protected void processCommandReports(HeartBeat heartbeat, long now) throws AmbariException { + processCommandReports(heartbeat.getReports(), heartbeat.getHostname(), now); + } + + protected void processCommandReports(List<CommandReport> reports, String hostName, Long now) throws AmbariException { - String hostname = heartbeat.getHostname(); - List<CommandReport> reports = heartbeat.getReports(); // Cache HostRoleCommand entities because we will need them few times List<Long> taskIds = new ArrayList<>(); @@ -364,21 +372,13 @@ public class HeartbeatProcessor extends AbstractService{ for (CommandReport report : reports) { - Long clusterId = null; - if (report.getClusterName() != null) { - try { - Cluster cluster = clusterFsm.getCluster(report.getClusterName()); - clusterId = cluster.getClusterId(); - } catch (AmbariException e) { - // null clusterId reported and handled by the listener (DistributeRepositoriesActionListener) - } - } + Long clusterId = Long.parseLong(report.getClusterId()); LOG.debug("Received command report: " + report); - Host host = clusterFsm.getHost(hostname); + Host host = clusterFsm.getHost(hostName); // HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database if (host == null) { - LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostname); + LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostName); continue; } @@ -386,7 +386,7 @@ public class HeartbeatProcessor extends AbstractService{ if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE && HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) { ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent( - clusterId, hostname, report, false); + clusterId, hostName, report, false); ambariEventPublisher.publish(event); } @@ -456,7 +456,7 @@ public class HeartbeatProcessor extends AbstractService{ continue; } - Cluster cl = clusterFsm.getCluster(report.getClusterName()); + Cluster cl = clusterFsm.getCluster(Long.parseLong(report.getClusterId())); String service = report.getServiceName(); if (service == null || service.isEmpty()) { throw new AmbariException("Invalid command report, service: " + service); @@ -467,7 +467,7 @@ public class HeartbeatProcessor extends AbstractService{ try { Service svc = cl.getService(service); ServiceComponent svcComp = svc.getServiceComponent(report.getRole()); - ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname); + ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostName); String schName = scHost.getServiceComponentName(); if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) { @@ -491,27 +491,6 @@ public class HeartbeatProcessor extends AbstractService{ versionEventPublisher.publish(event); } - // Updating stack version, if needed (this is not actually for express/rolling upgrades!) - if (scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING)) { - scHost.setStackVersion(scHost.getDesiredStackVersion()); - } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) || - (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) && - ("START".equals(report.getCustomCommand()) || - "RESTART".equals(report.getCustomCommand())))) - && null != report.getConfigurationTags() - && !report.getConfigurationTags().isEmpty()) { - LOG.info("Updating applied config on service " + scHost.getServiceName() + - ", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName()); - scHost.updateActualConfigs(report.getConfigurationTags()); - scHost.setRestartRequired(false); - } - // Necessary for resetting clients stale configs after starting service - if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) || - (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) && - "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){ - scHost.updateActualConfigs(report.getConfigurationTags()); - scHost.setRestartRequired(false); - } if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) && !("START".equals(report.getCustomCommand()) || "STOP".equals(report.getCustomCommand()))) { @@ -524,16 +503,16 @@ public class HeartbeatProcessor extends AbstractService{ (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) && "START".equals(report.getCustomCommand()))) { scHost.handleEvent(new ServiceComponentHostStartedEvent(schName, - hostname, now)); + hostName, now)); scHost.setRestartRequired(false); } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) || (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) && "STOP".equals(report.getCustomCommand()))) { scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName, - hostname, now)); + hostName, now)); } else { scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName, - hostname, now)); + hostName, now)); } } else if (report.getStatus().equals("FAILED")) { @@ -550,16 +529,16 @@ public class HeartbeatProcessor extends AbstractService{ } LOG.error("Operation failed - may be retried. Service component host: " - + schName + ", host: " + hostname + " Action id " + report.getActionId() + " and taskId " + report.getTaskId()); + + schName + ", host: " + hostName + " Action id " + report.getActionId() + " and taskId " + report.getTaskId()); if (actionManager.isInProgressCommand(report)) { scHost.handleEvent(new ServiceComponentHostOpFailedEvent - (schName, hostname, now)); + (schName, hostName, now)); } else { LOG.info("Received report for a command that is no longer active. " + report); } } else if (report.getStatus().equals("IN_PROGRESS")) { scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName, - hostname, now)); + hostName, now)); } } catch (ServiceComponentNotFoundException scnex) { LOG.warn("Service component not found ", scnex); @@ -574,7 +553,7 @@ public class HeartbeatProcessor extends AbstractService{ } //Update state machines from reports - actionManager.processTaskResponse(hostname, reports, commands); + actionManager.processTaskResponse(hostName, reports, commands); } /** @@ -594,7 +573,7 @@ public class HeartbeatProcessor extends AbstractService{ Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname); for (Cluster cl : clusters) { for (ComponentStatus status : componentStatuses) { - if (status.getClusterName().equals(cl.getClusterName())) { + if (status.getClusterId().equals(cl.getClusterId())) { try { Service svc = cl.getService(status.getServiceName()); @@ -617,7 +596,7 @@ public class HeartbeatProcessor extends AbstractService{ if (!prevState.equals(liveState)) { LOG.info("State of service component " + componentName + " of service " + status.getServiceName() - + " of cluster " + status.getClusterName() + + " of cluster " + status.getClusterId() + " has changed from " + prevState + " to " + liveState + " at host " + hostname + " according to STATUS_COMMAND report"); @@ -625,6 +604,26 @@ public class HeartbeatProcessor extends AbstractService{ } } + if (status.getSecurityState() != null) { + SecurityState prevSecurityState = scHost.getSecurityState(); + SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState()); + if ((prevSecurityState != currentSecurityState)) { + if (prevSecurityState.isEndpoint()) { + scHost.setSecurityState(currentSecurityState); + LOG.info(String.format("Security of service component %s of service %s of cluster %s " + + "has changed from %s to %s on host %s", + componentName, status.getServiceName(), status.getClusterId(), prevSecurityState, + currentSecurityState, hostname)); + } else { + LOG.debug(String.format("Security of service component %s of service %s of cluster %s " + + "has changed from %s to %s on host %s but will be ignored since %s is a " + + "transitional state", + componentName, status.getServiceName(), status.getClusterId(), + prevSecurityState, currentSecurityState, hostname, prevSecurityState)); + } + } + } + if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) { scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class)); } @@ -664,14 +663,14 @@ public class HeartbeatProcessor extends AbstractService{ } catch (ServiceNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" + " service" - + ", clusterName=" + status.getClusterName() + + ", clusterId=" + status.getClusterId() + ", serviceName=" + status.getServiceName()); // FIXME ignore invalid live update and continue for now? continue; } catch (ServiceComponentNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" + " servicecomponent" - + ", clusterName=" + status.getClusterName() + + ", clusterId=" + status.getClusterId() + ", serviceName=" + status.getServiceName() + ", componentName=" + status.getComponentName()); // FIXME ignore invalid live update and continue for now? @@ -679,7 +678,7 @@ public class HeartbeatProcessor extends AbstractService{ } catch (ServiceComponentHostNotFoundException e) { LOG.warn("Received a live status update for a non-initialized" + " service" - + ", clusterName=" + status.getClusterName() + + ", clusterId=" + status.getClusterId() + ", serviceName=" + status.getServiceName() + ", componentName=" + status.getComponentName() + ", hostname=" + hostname); @@ -688,7 +687,7 @@ public class HeartbeatProcessor extends AbstractService{ } catch (RuntimeException e) { LOG.warn("Received a live status with invalid payload" + " service" - + ", clusterName=" + status.getClusterName() + + ", clusterId=" + status.getClusterId() + ", serviceName=" + status.getServiceName() + ", componentName=" + status.getComponentName() + ", hostname=" + hostname @@ -698,6 +697,19 @@ public class HeartbeatProcessor extends AbstractService{ } } } + + Host host = clusterFsm.getHost(hostname); + long now = System.currentTimeMillis(); + // If the host is waiting for component status updates, notify it + if (componentStatuses.size() > 0 + && host.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) { + try { + LOG.debug("Got component status updates for host {}", hostname); + host.handleEvent(new HostStatusUpdatesReceivedEvent(hostname, now)); + } catch (InvalidStateTransitionException e) { + LOG.warn("Failed to notify the host about component status updates for host {}", hostname, e); + } + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java index e54fcfd..24d96b9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java @@ -56,10 +56,6 @@ public class RecoveryConfig { @com.fasterxml.jackson.annotation.JsonProperty("components") private String enabledComponents; - @SerializedName("recoveryTimestamp") - @com.fasterxml.jackson.annotation.JsonProperty("recoveryTimestamp") - private long recoveryTimestamp; - public String getEnabledComponents() { return enabledComponents; } @@ -108,24 +104,6 @@ public class RecoveryConfig { this.maxLifetimeCount = maxLifetimeCount; } - /** - * Timestamp when the recovery values were last updated. - * - * @return - Timestamp. - */ - public long getRecoveryTimestamp() { - return recoveryTimestamp; - } - - /** - * Set the timestamp when the recovery values were last updated. - * - * @param recoveryTimestamp - */ - public void setRecoveryTimestamp(long recoveryTimestamp) { - this.recoveryTimestamp = recoveryTimestamp; - } - @Override public String toString() { StringBuilder buffer = new StringBuilder("RecoveryConfig{"); @@ -135,7 +113,6 @@ public class RecoveryConfig { buffer.append(", retryGap=").append(retryGap); buffer.append(", maxLifetimeCount=").append(maxLifetimeCount); buffer.append(", components=").append(enabledComponents); - buffer.append(", recoveryTimestamp=").append(recoveryTimestamp); buffer.append('}'); return buffer.toString(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java index 9d5bae7d..4346472 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java @@ -107,7 +107,6 @@ public class RecoveryConfigHelper { recoveryConfig.setRetryGap(autoStartConfig.getNodeRecoveryRetryGap()); recoveryConfig.setType(autoStartConfig.getNodeRecoveryType()); recoveryConfig.setWindowInMinutes(autoStartConfig.getNodeRecoveryWindowInMin()); - recoveryConfig.setRecoveryTimestamp(now); if (autoStartConfig.isRecoveryEnabled()) { recoveryConfig.setEnabledComponents(StringUtils.join(autoStartConfig.getEnabledComponents(hostname), ',')); } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java index 46f3bb4..d4791f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java @@ -24,14 +24,18 @@ import java.util.Map; import org.codehaus.jackson.annotate.JsonProperty; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonInclude; + /** * * Controller to Agent response data model. * */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) public class RegistrationResponse { @JsonProperty("response") - @com.fasterxml.jackson.annotation.JsonProperty("response") + @JsonIgnore private RegistrationStatus response; /** @@ -39,7 +43,7 @@ public class RegistrationResponse { * alert definitions it needs to schedule. */ @JsonProperty("alertDefinitionCommands") - @com.fasterxml.jackson.annotation.JsonProperty("alertDefinitionCommands") + @JsonIgnore private List<AlertDefinitionCommand> alertDefinitionCommands = new ArrayList<>(); /** @@ -65,17 +69,18 @@ public class RegistrationResponse { private long responseId; @JsonProperty("recoveryConfig") - @com.fasterxml.jackson.annotation.JsonProperty("recoveryConfig") + @JsonIgnore private RecoveryConfig recoveryConfig; @JsonProperty("agentConfig") - @com.fasterxml.jackson.annotation.JsonProperty("agentConfig") + @JsonIgnore private Map<String, String> agentConfig; @JsonProperty("statusCommands") - @com.fasterxml.jackson.annotation.JsonProperty("statusCommands") + @JsonIgnore private List<StatusCommand> statusCommands = null; + @JsonIgnore public RegistrationStatus getResponseStatus() { return response; } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java index e3038b6..0f0a491 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentClusterDataHolder.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,34 +18,22 @@ package org.apache.ambari.server.agent.stomp; -import java.io.UnsupportedEncodingException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.stomp.dto.Hashable; import org.apache.commons.lang.StringUtils; -import com.google.gson.Gson; - /** - * Is used to saving and updating last version of event + * Is used to saving and updating last version of event in cluster scope * @param <T> event with hash to control version */ -public abstract class AgentClusterDataHolder<T extends Hashable> { - private String parentHash; - private String currentHash; +public abstract class AgentClusterDataHolder<T extends Hashable> extends AgentDataHolder { private T data; - public final String salt = ""; - public T getUpdateIfChanged(String agentHash) throws AmbariException { - if (StringUtils.isEmpty(agentHash) || (StringUtils.isNotEmpty(agentHash) && !agentHash.equals(currentHash))) { + if (StringUtils.isEmpty(agentHash) || (StringUtils.isNotEmpty(agentHash) && (data == null || !agentHash.equals(data.getHash())))) { if (data == null) { data = getCurrentData(); - parentHash = currentHash; - currentHash = getHash(data); - data.setHash(currentHash); + data.setHash(getHash(data)); } return data; } @@ -57,51 +45,12 @@ public abstract class AgentClusterDataHolder<T extends Hashable> { protected abstract T getEmptyData(); protected void regenerateHash() { - setCurrentHash(null); - setParentHash(getCurrentHash()); - setCurrentHash(getHash(getData())); - getData().setHash(getCurrentHash()); - } - - protected String getHash(T data) { - String json = new Gson().toJson(data); - String generatedPassword = null; - try { - MessageDigest md = MessageDigest.getInstance("SHA-512"); - md.update(salt.getBytes("UTF-8")); - byte[] bytes = md.digest(json.getBytes("UTF-8")); - StringBuilder sb = new StringBuilder(); - for(int i=0; i< bytes.length ;i++){ - sb.append(Integer.toString((bytes[i] & 0xff) + 0x100, 16).substring(1)); - } - generatedPassword = sb.toString(); - } - catch (NoSuchAlgorithmException e){ - e.printStackTrace(); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - return generatedPassword; + getData().setHash(null); + getData().setHash(getHash(getData())); } public abstract void updateData(T update) throws AmbariException; - public String getParentHash() { - return parentHash; - } - - public void setParentHash(String parentHash) { - this.parentHash = parentHash; - } - - public String getCurrentHash() { - return currentHash; - } - - public void setCurrentHash(String currentHash) { - this.currentHash = currentHash; - } - public T getData() { return data; } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java new file mode 100644 index 0000000..0de686d --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentConfigsHolder.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.agent.stomp; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.events.AgentConfigsUpdateEvent; +import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.ConfigHelper; +import org.apache.commons.collections.CollectionUtils; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +@Singleton +public class AgentConfigsHolder extends AgentHostDataHolder<AgentConfigsUpdateEvent> { + + @Inject + private ConfigHelper configHelper; + + @Inject + private Provider<Clusters> m_clusters; + + @Inject + private StateUpdateEventPublisher stateUpdateEventPublisher; + + @Override + public AgentConfigsUpdateEvent getCurrentData(String hostName) throws AmbariException { + return configHelper.getHostActualConfigs(hostName); + } + + public void updateData(AgentConfigsUpdateEvent update) throws AmbariException { + //do nothing + } + + public void updateData(Long clusterId, List<String> hostNames) throws AmbariException { + if (CollectionUtils.isEmpty(hostNames)) { + // TODO cluster configs will be created before hosts assigning + if (CollectionUtils.isEmpty(m_clusters.get().getCluster(clusterId).getHosts())) { + hostNames = m_clusters.get().getHosts().stream().map(h -> h.getHostName()).collect(Collectors.toList()); + } else { + hostNames = m_clusters.get().getCluster(clusterId).getHosts().stream().map(h -> h.getHostName()).collect(Collectors.toList()); + } + } + + for (String hostName : hostNames) { + AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs(hostName); + agentConfigsUpdateEvent.setHostName(hostName); + setData(agentConfigsUpdateEvent, hostName); + regenerateHash(hostName); + stateUpdateEventPublisher.publish(agentConfigsUpdateEvent); + } + } + + @Override + protected AgentConfigsUpdateEvent getEmptyData() { + return AgentConfigsUpdateEvent.emptyUpdate(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java index f0f18cc..7035f38 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentCurrentDataController.java @@ -21,9 +21,9 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.AgentSessionManager; import org.apache.ambari.server.agent.stomp.dto.Hash; import org.apache.ambari.server.events.AgentConfigsUpdateEvent; +import org.apache.ambari.server.events.HostLevelParamsUpdateEvent; import org.apache.ambari.server.events.MetadataUpdateEvent; import org.apache.ambari.server.events.TopologyUpdateEvent; -import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,16 +43,15 @@ public class AgentCurrentDataController { private final AgentSessionManager agentSessionManager; private final TopologyHolder topologyHolder; private final MetadataHolder metadataHolder; - private final ConfigHelper configHelper; - - //TODO remove after hash generation implementing - private final String CONFIGS_HASH_STUB = "stubhash"; + private final HostLevelParamsHolder hostLevelParamsHolder; + private final AgentConfigsHolder agentConfigsHolder; public AgentCurrentDataController(Injector injector) { agentSessionManager = injector.getInstance(AgentSessionManager.class); topologyHolder = injector.getInstance(TopologyHolder.class); metadataHolder = injector.getInstance(MetadataHolder.class); - configHelper = injector.getInstance(ConfigHelper.class); + hostLevelParamsHolder = injector.getInstance(HostLevelParamsHolder.class); + agentConfigsHolder = injector.getInstance(AgentConfigsHolder.class); } @SubscribeMapping("/topologies") @@ -68,10 +67,11 @@ public class AgentCurrentDataController { //TODO method should returns empty response in case hash is relevant @SubscribeMapping("/configs") public AgentConfigsUpdateEvent getCurrentConfigs(@Header String simpSessionId, Hash hash) throws AmbariException { - String currentHash = CONFIGS_HASH_STUB; - AgentConfigsUpdateEvent agentConfigsUpdateEvent = configHelper.getHostActualConfigs( - agentSessionManager.getHost(simpSessionId).getHostName()); - agentConfigsUpdateEvent.setHash(currentHash); - return agentConfigsUpdateEvent; + return agentConfigsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName()); + } + + @SubscribeMapping("/host_level_params") + public HostLevelParamsUpdateEvent getCurrentHostLevelParams(@Header String simpSessionId, Hash hash) throws AmbariException { + return hostLevelParamsHolder.getUpdateIfChanged(hash.getHash(), agentSessionManager.getHost(simpSessionId).getHostName()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java new file mode 100644 index 0000000..635611b --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentDataHolder.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.agent.stomp; + +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import org.apache.ambari.server.agent.stomp.dto.Hashable; + +import com.google.gson.Gson; + +/** + * Is used to hash generating for event + * @param <T> event with hash to control version + */ +public abstract class AgentDataHolder<T extends Hashable> { + public final String salt = ""; + + protected abstract T getEmptyData(); + + protected String getHash(T data) { + String json = new Gson().toJson(data); + String generatedPassword = null; + try { + MessageDigest md = MessageDigest.getInstance("SHA-512"); + md.update(salt.getBytes("UTF-8")); + byte[] bytes = md.digest(json.getBytes("UTF-8")); + StringBuilder sb = new StringBuilder(); + for(int i=0; i< bytes.length ;i++){ + sb.append(Integer.toString((bytes[i] & 0xff) + 0x100, 16).substring(1)); + } + generatedPassword = sb.toString(); + } + catch (NoSuchAlgorithmException e){ + e.printStackTrace(); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + return generatedPassword; + } +}
