AMBARI-22734. alert_definitions topic doesn't emit any events to client. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/57a0e5c9 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/57a0e5c9 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/57a0e5c9 Branch: refs/heads/branch-3.0-perf Commit: 57a0e5c91618e2d03d9a1f41cb423c609d8029e3 Parents: 086bad4 Author: Myroslav Papirkovskyi <[email protected]> Authored: Wed Nov 1 20:26:16 2017 +0200 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Fri Jan 5 19:13:14 2018 +0200 ---------------------------------------------------------------------- ...MessageDestinationIsNotDefinedException.java | 29 ++++ .../configuration/spring/RootStompConfig.java | 13 ++ .../events/AlertDefinitionsMessageEmitter.java | 38 +++++ .../ambari/server/events/AmbariUpdateEvent.java | 45 ++---- .../server/events/DefaultMessageEmitter.java | 63 ++++++++ .../ambari/server/events/MessageEmitter.java | 86 ++++++++++ .../events/NamedHostRoleCommandUpdateEvent.java | 162 ------------------- .../listeners/requests/StateUpdateListener.java | 43 ++--- .../listeners/tasks/TaskStatusListener.java | 15 -- 9 files changed, 256 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java b/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java new file mode 100644 index 0000000..7369bbc --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server; + +import org.apache.ambari.server.events.AmbariUpdateEvent; + +@SuppressWarnings("serial") +public class MessageDestinationIsNotDefinedException extends ObjectNotFoundException { + + public MessageDestinationIsNotDefinedException(AmbariUpdateEvent.Type eventType) { + super(String.format("No destination defined for message with {} type", eventType)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java index fda0607..c7e720b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java @@ -22,8 +22,11 @@ import java.util.List; import javax.servlet.ServletContext; +import org.apache.ambari.server.agent.AgentSessionManager; import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry; import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler; +import org.apache.ambari.server.events.AlertDefinitionsMessageEmitter; +import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.events.listeners.requests.StateUpdateListener; import org.eclipse.jetty.websocket.server.WebSocketServerFactory; import org.slf4j.Logger; @@ -66,6 +69,16 @@ public class RootStompConfig { } @Bean + public DefaultMessageEmitter defaultMessageSender(Injector injector) { + return new DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class), brokerTemplate); + } + + @Bean + public AlertDefinitionsMessageEmitter alertDefinitionsMessageSender(Injector injector) { + return new AlertDefinitionsMessageEmitter(injector.getInstance(AgentSessionManager.class), brokerTemplate); + } + + @Bean public DefaultHandshakeHandler handshakeHandler() { return new DefaultHandshakeHandler( http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java new file mode 100644 index 0000000..dd270bd --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import org.apache.ambari.server.HostNotRegisteredException; +import org.apache.ambari.server.agent.AgentSessionManager; +import org.springframework.messaging.simp.SimpMessagingTemplate; + +public class AlertDefinitionsMessageEmitter extends MessageEmitter { + + private final String ALERT_DESTINATION_TO_HOST = "/alert_definitions"; + private final String ALERT_DESTINATION_TO_API = "/events/alert_definitions"; + + public AlertDefinitionsMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) { + super(agentSessionManager, simpMessagingTemplate); + } + + @Override + public void emitMessage(AmbariUpdateEvent event) throws HostNotRegisteredException { + emitMessageToHost((AmbariHostUpdateEvent) event, ALERT_DESTINATION_TO_HOST); + emitMessageToAll(event, ALERT_DESTINATION_TO_API); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java index 6991f15..579f8e0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java @@ -39,50 +39,35 @@ public abstract class AmbariUpdateEvent { } @Transient - public String getDestination() { - return type.getDestination(); - } - - @Transient public String getMetricName() { return type.getMetricName(); } public enum Type { - ALERT("/events/alerts", "events.alerts"), - METADATA("/events/metadata", "events.metadata"), - HOSTLEVELPARAMS("/host_level_params", "events.hostlevelparams"), - UI_TOPOLOGY("/events/ui_topologies", "events.topology_update"), - AGENT_TOPOLOGY("/events/topologies", "events.topology_update"), - AGENT_CONFIGS("/configs", "events.agent.configs"), - CONFIGS("/events/configs", "events.configs"), - HOSTCOMPONENT("/events/hostcomponents", "events.hostcomponents"), - NAMEDHOSTCOMPONENT("/events/tasks/", "events.hostrolecommands.named"), - REQUEST("/events/requests", "events.requests"), - SERVICE("/events/services", "events.services"), - HOST("/events/hosts", "events.hosts"), - ALERT_DEFINITIONS("/alert_definitions", "events.alert_definitions"), - COMMAND("/commands", "events.commands"); - - /** - * Destination is used for delivery message to recipients. - */ - private String destination; + ALERT("events.alerts"), + METADATA("events.metadata"), + HOSTLEVELPARAMS("events.hostlevelparams"), + UI_TOPOLOGY("events.topology_update"), + AGENT_TOPOLOGY("events.topology_update"), + AGENT_CONFIGS("events.agent.configs"), + CONFIGS("events.configs"), + HOSTCOMPONENT("events.hostcomponents"), + NAMEDHOSTCOMPONENT("events.hostrolecommands.named"), + REQUEST("events.requests"), + SERVICE("events.services"), + HOST("events.hosts"), + ALERT_DEFINITIONS("events.alert_definitions"), + COMMAND("events.commands"); /** * Is used to collect info about event appearing frequency. */ private String metricName; - Type(String destination, String metricName) { - this.destination = destination; + Type(String metricName) { this.metricName = metricName; } - public String getDestination() { - return destination; - } - public String getMetricName() { return metricName; } http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java new file mode 100644 index 0000000..2e0b98e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.MessageDestinationIsNotDefinedException; +import org.apache.ambari.server.agent.AgentSessionManager; +import org.springframework.messaging.simp.SimpMessagingTemplate; + +public class DefaultMessageEmitter extends MessageEmitter { + private final Map<AmbariUpdateEvent.Type, String> DEFAULT_DESTINATIONS = + Collections.unmodifiableMap(new HashMap<AmbariUpdateEvent.Type, String>(){{ + put(AmbariUpdateEvent.Type.ALERT, "/events/alerts"); + put(AmbariUpdateEvent.Type.METADATA, "/events/metadata"); + put(AmbariUpdateEvent.Type.HOSTLEVELPARAMS, "/host_level_params"); + put(AmbariUpdateEvent.Type.UI_TOPOLOGY, "/events/ui_topologies"); + put(AmbariUpdateEvent.Type.AGENT_TOPOLOGY, "/events/topologies"); + put(AmbariUpdateEvent.Type.AGENT_CONFIGS, "/configs"); + put(AmbariUpdateEvent.Type.CONFIGS, "/events/configs"); + put(AmbariUpdateEvent.Type.HOSTCOMPONENT, "/events/hostcomponents"); + put(AmbariUpdateEvent.Type.REQUEST, "/events/requests"); + put(AmbariUpdateEvent.Type.SERVICE, "/events/services"); + put(AmbariUpdateEvent.Type.HOST, "/events/hosts"); + put(AmbariUpdateEvent.Type.COMMAND, "/commands"); + }}); + + public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) { + super(agentSessionManager, simpMessagingTemplate); + } + + @Override + public void emitMessage(AmbariUpdateEvent event) throws AmbariException { + String destination = DEFAULT_DESTINATIONS.get(event.getType()); + if (destination == null) { + throw new MessageDestinationIsNotDefinedException(event.getType()); + } + if (event instanceof AmbariHostUpdateEvent) { + AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event; + emitMessageToHost(hostUpdateEvent, destination); + } else { + emitMessageToAll(event, destination); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java new file mode 100644 index 0000000..203bb03 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.HostNotRegisteredException; +import org.apache.ambari.server.agent.AgentSessionManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; +import org.springframework.messaging.simp.SimpMessagingTemplate; + +/** + * Is used to define a strategy for emitting message to subscribers. + */ +public abstract class MessageEmitter { + private final static Logger LOG = LoggerFactory.getLogger(MessageEmitter.class); + protected final AgentSessionManager agentSessionManager; + protected final SimpMessagingTemplate simpMessagingTemplate; + + + public MessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) { + this.agentSessionManager = agentSessionManager; + this.simpMessagingTemplate = simpMessagingTemplate; + } + + /** + * Determines destinations and emits message. + * @param event message should to be emitted. + * @throws AmbariException + */ + abstract void emitMessage(AmbariUpdateEvent event) throws AmbariException; + + /** + * Creates STOMP message header. + * @param sessionId + * @return message header. + */ + protected MessageHeaders createHeaders(String sessionId) { + SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); + headerAccessor.setSessionId(sessionId); + headerAccessor.setLeaveMutable(true); + return headerAccessor.getMessageHeaders(); + } + + /** + * Emits message to all subscribers. + * @param event message should to be emitted. + * @param destination + */ + protected void emitMessageToAll(AmbariUpdateEvent event, String destination) { + LOG.debug("Received status update event {}", event); + simpMessagingTemplate.convertAndSend(destination, event); + } + + /** + * Emit message to specified host only. + * @param event message should to be emitted. + * @param destination + * @throws HostNotRegisteredException in case host is not registered. + */ + protected void emitMessageToHost(AmbariHostUpdateEvent event, String destination) throws HostNotRegisteredException { + Long hostId = event.getHostId(); + String sessionId = agentSessionManager.getSessionId(hostId); + LOG.debug("Received status update event {} for host {} registered with session ID {}", event, hostId, sessionId); + MessageHeaders headers = createHeaders(sessionId); + simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java deleted file mode 100644 index 53dc69f..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.server.events; - -import org.apache.ambari.server.actionmanager.HostRoleStatus; - -import com.fasterxml.jackson.annotation.JsonInclude; - -/** - * Single host role command update info. This update will be sent to all subscribed recipients. - */ -@JsonInclude(JsonInclude.Include.NON_NULL) -public class NamedHostRoleCommandUpdateEvent extends AmbariUpdateEvent { - - private Long id; - private Long requestId; - private String hostName; - private Long endTime; - private HostRoleStatus status; - private String errorLog; - private String outLog; - private String stderr; - private String stdout; - - public NamedHostRoleCommandUpdateEvent(Long id, Long requestId, String hostName, Long endTime, HostRoleStatus status, String errorLog, String outLog, String stderr, String stdout) { - super(Type.NAMEDHOSTCOMPONENT); - this.id = id; - this.requestId = requestId; - this.hostName = hostName; - this.endTime = endTime; - this.status = status; - this.errorLog = errorLog; - this.outLog = outLog; - this.stderr = stderr; - this.stdout = stdout; - } - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public Long getRequestId() { - return requestId; - } - - public void setRequestId(Long requestId) { - this.requestId = requestId; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public Long getEndTime() { - return endTime; - } - - public void setEndTime(Long endTime) { - this.endTime = endTime; - } - - public HostRoleStatus getStatus() { - return status; - } - - public void setStatus(HostRoleStatus status) { - this.status = status; - } - - public String getErrorLog() { - return errorLog; - } - - public void setErrorLog(String errorLog) { - this.errorLog = errorLog; - } - - public String getOutLog() { - return outLog; - } - - public void setOutLog(String outLog) { - this.outLog = outLog; - } - - public String getStderr() { - return stderr; - } - - public void setStderr(String stderr) { - this.stderr = stderr; - } - - public String getStdout() { - return stdout; - } - - public void setStdout(String stdout) { - this.stdout = stdout; - } - - @Override - public String getDestination() { - return super.getDestination() + getId(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - NamedHostRoleCommandUpdateEvent that = (NamedHostRoleCommandUpdateEvent) o; - - if (id != null ? !id.equals(that.id) : that.id != null) return false; - if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false; - if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false; - if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false; - if (status != that.status) return false; - if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) return false; - if (outLog != null ? !outLog.equals(that.outLog) : that.outLog != null) return false; - if (stderr != null ? !stderr.equals(that.stderr) : that.stderr != null) return false; - return stdout != null ? stdout.equals(that.stdout) : that.stdout == null; - } - - @Override - public int hashCode() { - int result = id != null ? id.hashCode() : 0; - result = 31 * result + (requestId != null ? requestId.hashCode() : 0); - result = 31 * result + (hostName != null ? hostName.hashCode() : 0); - result = 31 * result + (endTime != null ? endTime.hashCode() : 0); - result = 31 * result + (status != null ? status.hashCode() : 0); - result = 31 * result + (errorLog != null ? errorLog.hashCode() : 0); - result = 31 * result + (outLog != null ? outLog.hashCode() : 0); - result = 31 * result + (stderr != null ? stderr.hashCode() : 0); - result = 31 * result + (stdout != null ? stdout.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java index 17e8c3d..8425d77 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java @@ -18,31 +18,27 @@ package org.apache.ambari.server.events.listeners.requests; -import org.apache.ambari.server.HostNotRegisteredException; +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.AgentSessionManager; -import org.apache.ambari.server.events.AmbariHostUpdateEvent; +import org.apache.ambari.server.events.AlertDefinitionsMessageEmitter; +import org.apache.ambari.server.events.AlertDefinitionsUpdateEvent; import org.apache.ambari.server.events.AmbariUpdateEvent; +import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.simp.SimpMessageHeaderAccessor; -import org.springframework.messaging.simp.SimpMessageType; -import org.springframework.messaging.simp.SimpMessagingTemplate; import com.google.common.eventbus.AllowConcurrentEvents; import com.google.common.eventbus.Subscribe; import com.google.inject.Injector; public class StateUpdateListener { - - private final static Logger LOG = LoggerFactory.getLogger(StateUpdateListener.class); - private final AgentSessionManager agentSessionManager; @Autowired - SimpMessagingTemplate simpMessagingTemplate; + private DefaultMessageEmitter defaultMessageEmitter; + + @Autowired + private AlertDefinitionsMessageEmitter alertDefinitionsMessageEmitter; public StateUpdateListener(Injector injector) { StateUpdateEventPublisher stateUpdateEventPublisher = @@ -53,25 +49,10 @@ public class StateUpdateListener { @Subscribe @AllowConcurrentEvents - public void onUpdateEvent(AmbariUpdateEvent event) throws HostNotRegisteredException { - String destination = event.getDestination(); - if (event instanceof AmbariHostUpdateEvent) { - AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event; - Long hostId = hostUpdateEvent.getHostId(); - String sessionId = agentSessionManager.getSessionId(hostId); - LOG.debug("Received status update event {} for host {} registered with session ID {}", hostUpdateEvent, hostId, sessionId); - MessageHeaders headers = createHeaders(sessionId); - simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, headers); - } else { - LOG.debug("Received status update event {}", event); - simpMessagingTemplate.convertAndSend(destination, event); + public void onUpdateEvent(AmbariUpdateEvent event) throws AmbariException { + if (event instanceof AlertDefinitionsUpdateEvent) { + alertDefinitionsMessageEmitter.emitMessage(event); } - } - - private MessageHeaders createHeaders(String sessionId) { - SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); - headerAccessor.setSessionId(sessionId); - headerAccessor.setLeaveMutable(true); - return headerAccessor.getMessageHeaders(); + defaultMessageEmitter.emitMessage(event); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java index 58bfcfc..888ed5d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -36,7 +36,6 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.actionmanager.Request; import org.apache.ambari.server.actionmanager.Stage; import org.apache.ambari.server.controller.internal.CalculatedStatus; -import org.apache.ambari.server.events.NamedHostRoleCommandUpdateEvent; import org.apache.ambari.server.events.RequestUpdateEvent; import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.TaskUpdateEvent; @@ -156,20 +155,6 @@ public class TaskStatusListener { } } } - - for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) { - NamedHostRoleCommandUpdateEvent namedHostRoleCommandUpdateEvent = new NamedHostRoleCommandUpdateEvent(hostRoleCommand.getTaskId(), - hostRoleCommand.getRequestId(), - hostRoleCommand.getHostName(), - hostRoleCommand.getEndTime(), - hostRoleCommand.getStatus(), - hostRoleCommand.getErrorLog(), - hostRoleCommand.getOutputLog(), - hostRoleCommand.getStderr(), - hostRoleCommand.getStdout() - ); - stateUpdateEventPublisher.publish(namedHostRoleCommandUpdateEvent); - } updateActiveTasksMap(hostRoleCommandWithReceivedStatus); Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll); // Presumption: If there is no update in any of the running stage's status
