AMBARI-22734. alert_definitions topic doesn't emit any events to client. 
(mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/57a0e5c9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/57a0e5c9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/57a0e5c9

Branch: refs/heads/branch-3.0-perf
Commit: 57a0e5c91618e2d03d9a1f41cb423c609d8029e3
Parents: 086bad4
Author: Myroslav Papirkovskyi <[email protected]>
Authored: Wed Nov 1 20:26:16 2017 +0200
Committer: Myroslav Papirkovskyi <[email protected]>
Committed: Fri Jan 5 19:13:14 2018 +0200

----------------------------------------------------------------------
 ...MessageDestinationIsNotDefinedException.java |  29 ++++
 .../configuration/spring/RootStompConfig.java   |  13 ++
 .../events/AlertDefinitionsMessageEmitter.java  |  38 +++++
 .../ambari/server/events/AmbariUpdateEvent.java |  45 ++----
 .../server/events/DefaultMessageEmitter.java    |  63 ++++++++
 .../ambari/server/events/MessageEmitter.java    |  86 ++++++++++
 .../events/NamedHostRoleCommandUpdateEvent.java | 162 -------------------
 .../listeners/requests/StateUpdateListener.java |  43 ++---
 .../listeners/tasks/TaskStatusListener.java     |  15 --
 9 files changed, 256 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java
new file mode 100644
index 0000000..7369bbc
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/MessageDestinationIsNotDefinedException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server;
+
+import org.apache.ambari.server.events.AmbariUpdateEvent;
+
+@SuppressWarnings("serial")
+public class MessageDestinationIsNotDefinedException extends 
ObjectNotFoundException {
+
+  public MessageDestinationIsNotDefinedException(AmbariUpdateEvent.Type 
eventType) {
+    super(String.format("No destination defined for message with {} type", 
eventType));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
index fda0607..c7e720b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/RootStompConfig.java
@@ -22,8 +22,11 @@ import java.util.List;
 
 import javax.servlet.ServletContext;
 
+import org.apache.ambari.server.agent.AgentSessionManager;
 import org.apache.ambari.server.agent.stomp.AmbariSubscriptionRegistry;
 import org.apache.ambari.server.api.AmbariSendToMethodReturnValueHandler;
+import org.apache.ambari.server.events.AlertDefinitionsMessageEmitter;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.listeners.requests.StateUpdateListener;
 import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
 import org.slf4j.Logger;
@@ -66,6 +69,16 @@ public class RootStompConfig {
   }
 
   @Bean
+  public DefaultMessageEmitter defaultMessageSender(Injector injector) {
+    return new 
DefaultMessageEmitter(injector.getInstance(AgentSessionManager.class), 
brokerTemplate);
+  }
+
+  @Bean
+  public AlertDefinitionsMessageEmitter alertDefinitionsMessageSender(Injector 
injector) {
+    return new 
AlertDefinitionsMessageEmitter(injector.getInstance(AgentSessionManager.class), 
brokerTemplate);
+  }
+
+  @Bean
   public DefaultHandshakeHandler handshakeHandler() {
 
     return new DefaultHandshakeHandler(

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java
new file mode 100644
index 0000000..dd270bd
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AlertDefinitionsMessageEmitter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.HostNotRegisteredException;
+import org.apache.ambari.server.agent.AgentSessionManager;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+
+public class AlertDefinitionsMessageEmitter extends MessageEmitter {
+
+  private final String ALERT_DESTINATION_TO_HOST = "/alert_definitions";
+  private final String ALERT_DESTINATION_TO_API = "/events/alert_definitions";
+
+  public AlertDefinitionsMessageEmitter(AgentSessionManager 
agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
+    super(agentSessionManager, simpMessagingTemplate);
+  }
+
+  @Override
+  public void emitMessage(AmbariUpdateEvent event) throws 
HostNotRegisteredException {
+    emitMessageToHost((AmbariHostUpdateEvent) event, 
ALERT_DESTINATION_TO_HOST);
+    emitMessageToAll(event, ALERT_DESTINATION_TO_API);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
index 6991f15..579f8e0 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
@@ -39,50 +39,35 @@ public abstract class AmbariUpdateEvent {
   }
 
   @Transient
-  public String getDestination() {
-    return type.getDestination();
-  }
-
-  @Transient
   public String getMetricName() {
     return type.getMetricName();
   }
 
   public enum Type {
-    ALERT("/events/alerts", "events.alerts"),
-    METADATA("/events/metadata", "events.metadata"),
-    HOSTLEVELPARAMS("/host_level_params", "events.hostlevelparams"),
-    UI_TOPOLOGY("/events/ui_topologies", "events.topology_update"),
-    AGENT_TOPOLOGY("/events/topologies", "events.topology_update"),
-    AGENT_CONFIGS("/configs", "events.agent.configs"),
-    CONFIGS("/events/configs", "events.configs"),
-    HOSTCOMPONENT("/events/hostcomponents", "events.hostcomponents"),
-    NAMEDHOSTCOMPONENT("/events/tasks/", "events.hostrolecommands.named"),
-    REQUEST("/events/requests", "events.requests"),
-    SERVICE("/events/services", "events.services"),
-    HOST("/events/hosts", "events.hosts"),
-    ALERT_DEFINITIONS("/alert_definitions", "events.alert_definitions"),
-    COMMAND("/commands", "events.commands");
-
-    /**
-     * Destination is used for delivery message to recipients.
-     */
-    private String destination;
+    ALERT("events.alerts"),
+    METADATA("events.metadata"),
+    HOSTLEVELPARAMS("events.hostlevelparams"),
+    UI_TOPOLOGY("events.topology_update"),
+    AGENT_TOPOLOGY("events.topology_update"),
+    AGENT_CONFIGS("events.agent.configs"),
+    CONFIGS("events.configs"),
+    HOSTCOMPONENT("events.hostcomponents"),
+    NAMEDHOSTCOMPONENT("events.hostrolecommands.named"),
+    REQUEST("events.requests"),
+    SERVICE("events.services"),
+    HOST("events.hosts"),
+    ALERT_DEFINITIONS("events.alert_definitions"),
+    COMMAND("events.commands");
 
     /**
      * Is used to collect info about event appearing frequency.
      */
     private String metricName;
 
-    Type(String destination, String metricName) {
-      this.destination = destination;
+    Type(String metricName) {
       this.metricName = metricName;
     }
 
-    public String getDestination() {
-      return destination;
-    }
-
     public String getMetricName() {
       return metricName;
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
new file mode 100644
index 0000000..2e0b98e
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.MessageDestinationIsNotDefinedException;
+import org.apache.ambari.server.agent.AgentSessionManager;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+
+public class DefaultMessageEmitter extends MessageEmitter {
+  private final Map<AmbariUpdateEvent.Type, String> DEFAULT_DESTINATIONS =
+      Collections.unmodifiableMap(new HashMap<AmbariUpdateEvent.Type, 
String>(){{
+        put(AmbariUpdateEvent.Type.ALERT, "/events/alerts");
+        put(AmbariUpdateEvent.Type.METADATA, "/events/metadata");
+        put(AmbariUpdateEvent.Type.HOSTLEVELPARAMS, "/host_level_params");
+        put(AmbariUpdateEvent.Type.UI_TOPOLOGY, "/events/ui_topologies");
+        put(AmbariUpdateEvent.Type.AGENT_TOPOLOGY, "/events/topologies");
+        put(AmbariUpdateEvent.Type.AGENT_CONFIGS, "/configs");
+        put(AmbariUpdateEvent.Type.CONFIGS, "/events/configs");
+        put(AmbariUpdateEvent.Type.HOSTCOMPONENT, "/events/hostcomponents");
+        put(AmbariUpdateEvent.Type.REQUEST, "/events/requests");
+        put(AmbariUpdateEvent.Type.SERVICE, "/events/services");
+        put(AmbariUpdateEvent.Type.HOST, "/events/hosts");
+        put(AmbariUpdateEvent.Type.COMMAND, "/commands");
+  }});
+
+  public DefaultMessageEmitter(AgentSessionManager agentSessionManager, 
SimpMessagingTemplate simpMessagingTemplate) {
+    super(agentSessionManager, simpMessagingTemplate);
+  }
+
+  @Override
+  public void emitMessage(AmbariUpdateEvent event) throws AmbariException {
+    String destination = DEFAULT_DESTINATIONS.get(event.getType());
+    if (destination == null) {
+      throw new MessageDestinationIsNotDefinedException(event.getType());
+    }
+    if (event instanceof AmbariHostUpdateEvent) {
+      AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event;
+      emitMessageToHost(hostUpdateEvent, destination);
+    } else {
+      emitMessageToAll(event, destination);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
new file mode 100644
index 0000000..203bb03
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MessageEmitter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.HostNotRegisteredException;
+import org.apache.ambari.server.agent.AgentSessionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
+import org.springframework.messaging.simp.SimpMessagingTemplate;
+
+/**
+ * Is used to define a strategy for emitting message to subscribers.
+ */
+public abstract class MessageEmitter {
+  private final static Logger LOG = 
LoggerFactory.getLogger(MessageEmitter.class);
+  protected final AgentSessionManager agentSessionManager;
+  protected final SimpMessagingTemplate simpMessagingTemplate;
+
+
+  public MessageEmitter(AgentSessionManager agentSessionManager, 
SimpMessagingTemplate simpMessagingTemplate) {
+    this.agentSessionManager = agentSessionManager;
+    this.simpMessagingTemplate = simpMessagingTemplate;
+  }
+
+  /**
+   * Determines destinations and emits message.
+   * @param event message should to be emitted.
+   * @throws AmbariException
+   */
+  abstract void emitMessage(AmbariUpdateEvent event) throws AmbariException;
+
+  /**
+   * Creates STOMP message header.
+   * @param sessionId
+   * @return message header.
+   */
+  protected MessageHeaders createHeaders(String sessionId) {
+    SimpMessageHeaderAccessor headerAccessor = 
SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
+    headerAccessor.setSessionId(sessionId);
+    headerAccessor.setLeaveMutable(true);
+    return headerAccessor.getMessageHeaders();
+  }
+
+  /**
+   * Emits message to all subscribers.
+   * @param event message should to be emitted.
+   * @param destination
+   */
+  protected void emitMessageToAll(AmbariUpdateEvent event, String destination) 
{
+    LOG.debug("Received status update event {}", event);
+    simpMessagingTemplate.convertAndSend(destination, event);
+  }
+
+  /**
+   * Emit message to specified host only.
+   * @param event message should to be emitted.
+   * @param destination
+   * @throws HostNotRegisteredException in case host is not registered.
+   */
+  protected void emitMessageToHost(AmbariHostUpdateEvent event, String 
destination) throws HostNotRegisteredException {
+    Long hostId = event.getHostId();
+    String sessionId = agentSessionManager.getSessionId(hostId);
+    LOG.debug("Received status update event {} for host {} registered with 
session ID {}", event, hostId, sessionId);
+    MessageHeaders headers = createHeaders(sessionId);
+    simpMessagingTemplate.convertAndSendToUser(sessionId, destination, event, 
headers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
deleted file mode 100644
index 53dc69f..0000000
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.events;
-
-import org.apache.ambari.server.actionmanager.HostRoleStatus;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-
-/**
- * Single host role command update info. This update will be sent to all 
subscribed recipients.
- */
-@JsonInclude(JsonInclude.Include.NON_NULL)
-public class NamedHostRoleCommandUpdateEvent extends AmbariUpdateEvent {
-
-  private Long id;
-  private Long requestId;
-  private String hostName;
-  private Long endTime;
-  private HostRoleStatus status;
-  private String errorLog;
-  private String outLog;
-  private String stderr;
-  private String stdout;
-
-  public NamedHostRoleCommandUpdateEvent(Long id, Long requestId, String 
hostName, Long endTime, HostRoleStatus status, String errorLog, String outLog, 
String stderr, String stdout) {
-    super(Type.NAMEDHOSTCOMPONENT);
-    this.id = id;
-    this.requestId = requestId;
-    this.hostName = hostName;
-    this.endTime = endTime;
-    this.status = status;
-    this.errorLog = errorLog;
-    this.outLog = outLog;
-    this.stderr = stderr;
-    this.stdout = stdout;
-  }
-
-  public Long getId() {
-    return id;
-  }
-
-  public void setId(Long id) {
-    this.id = id;
-  }
-
-  public Long getRequestId() {
-    return requestId;
-  }
-
-  public void setRequestId(Long requestId) {
-    this.requestId = requestId;
-  }
-
-  public String getHostName() {
-    return hostName;
-  }
-
-  public void setHostName(String hostName) {
-    this.hostName = hostName;
-  }
-
-  public Long getEndTime() {
-    return endTime;
-  }
-
-  public void setEndTime(Long endTime) {
-    this.endTime = endTime;
-  }
-
-  public HostRoleStatus getStatus() {
-    return status;
-  }
-
-  public void setStatus(HostRoleStatus status) {
-    this.status = status;
-  }
-
-  public String getErrorLog() {
-    return errorLog;
-  }
-
-  public void setErrorLog(String errorLog) {
-    this.errorLog = errorLog;
-  }
-
-  public String getOutLog() {
-    return outLog;
-  }
-
-  public void setOutLog(String outLog) {
-    this.outLog = outLog;
-  }
-
-  public String getStderr() {
-    return stderr;
-  }
-
-  public void setStderr(String stderr) {
-    this.stderr = stderr;
-  }
-
-  public String getStdout() {
-    return stdout;
-  }
-
-  public void setStdout(String stdout) {
-    this.stdout = stdout;
-  }
-
-  @Override
-  public String getDestination() {
-    return super.getDestination() + getId();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    NamedHostRoleCommandUpdateEvent that = (NamedHostRoleCommandUpdateEvent) o;
-
-    if (id != null ? !id.equals(that.id) : that.id != null) return false;
-    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId 
!= null) return false;
-    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != 
null) return false;
-    if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != 
null) return false;
-    if (status != that.status) return false;
-    if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != 
null) return false;
-    if (outLog != null ? !outLog.equals(that.outLog) : that.outLog != null) 
return false;
-    if (stderr != null ? !stderr.equals(that.stderr) : that.stderr != null) 
return false;
-    return stdout != null ? stdout.equals(that.stdout) : that.stdout == null;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = id != null ? id.hashCode() : 0;
-    result = 31 * result + (requestId != null ? requestId.hashCode() : 0);
-    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
-    result = 31 * result + (endTime != null ? endTime.hashCode() : 0);
-    result = 31 * result + (status != null ? status.hashCode() : 0);
-    result = 31 * result + (errorLog != null ? errorLog.hashCode() : 0);
-    result = 31 * result + (outLog != null ? outLog.hashCode() : 0);
-    result = 31 * result + (stderr != null ? stderr.hashCode() : 0);
-    result = 31 * result + (stdout != null ? stdout.hashCode() : 0);
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
index 17e8c3d..8425d77 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
@@ -18,31 +18,27 @@
 
 package org.apache.ambari.server.events.listeners.requests;
 
-import org.apache.ambari.server.HostNotRegisteredException;
+import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AgentSessionManager;
-import org.apache.ambari.server.events.AmbariHostUpdateEvent;
+import org.apache.ambari.server.events.AlertDefinitionsMessageEmitter;
+import org.apache.ambari.server.events.AlertDefinitionsUpdateEvent;
 import org.apache.ambari.server.events.AmbariUpdateEvent;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.messaging.MessageHeaders;
-import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
-import org.springframework.messaging.simp.SimpMessageType;
-import org.springframework.messaging.simp.SimpMessagingTemplate;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Injector;
 
 public class StateUpdateListener {
-
-  private final static Logger LOG = 
LoggerFactory.getLogger(StateUpdateListener.class);
-
   private final AgentSessionManager agentSessionManager;
 
   @Autowired
-  SimpMessagingTemplate simpMessagingTemplate;
+  private DefaultMessageEmitter defaultMessageEmitter;
+
+  @Autowired
+  private AlertDefinitionsMessageEmitter alertDefinitionsMessageEmitter;
 
   public StateUpdateListener(Injector injector) {
     StateUpdateEventPublisher stateUpdateEventPublisher =
@@ -53,25 +49,10 @@ public class StateUpdateListener {
 
   @Subscribe
   @AllowConcurrentEvents
-  public void onUpdateEvent(AmbariUpdateEvent event) throws 
HostNotRegisteredException {
-    String destination = event.getDestination();
-    if (event instanceof AmbariHostUpdateEvent) {
-      AmbariHostUpdateEvent hostUpdateEvent = (AmbariHostUpdateEvent) event;
-      Long hostId = hostUpdateEvent.getHostId();
-      String sessionId = agentSessionManager.getSessionId(hostId);
-      LOG.debug("Received status update event {} for host {} registered with 
session ID {}", hostUpdateEvent, hostId, sessionId);
-      MessageHeaders headers = createHeaders(sessionId);
-      simpMessagingTemplate.convertAndSendToUser(sessionId, destination, 
event, headers);
-    } else {
-      LOG.debug("Received status update event {}", event);
-      simpMessagingTemplate.convertAndSend(destination, event);
+  public void onUpdateEvent(AmbariUpdateEvent event) throws AmbariException {
+    if (event instanceof AlertDefinitionsUpdateEvent) {
+      alertDefinitionsMessageEmitter.emitMessage(event);
     }
-  }
-
-  private MessageHeaders createHeaders(String sessionId) {
-    SimpMessageHeaderAccessor headerAccessor = 
SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
-    headerAccessor.setSessionId(sessionId);
-    headerAccessor.setLeaveMutable(true);
-    return headerAccessor.getMessageHeaders();
+    defaultMessageEmitter.emitMessage(event);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/57a0e5c9/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 58bfcfc..888ed5d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -36,7 +36,6 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Request;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
-import org.apache.ambari.server.events.NamedHostRoleCommandUpdateEvent;
 import org.apache.ambari.server.events.RequestUpdateEvent;
 import org.apache.ambari.server.events.TaskCreateEvent;
 import org.apache.ambari.server.events.TaskUpdateEvent;
@@ -156,20 +155,6 @@ public class TaskStatusListener {
         }
       }
     }
-
-    for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) {
-      NamedHostRoleCommandUpdateEvent namedHostRoleCommandUpdateEvent = new 
NamedHostRoleCommandUpdateEvent(hostRoleCommand.getTaskId(),
-          hostRoleCommand.getRequestId(),
-          hostRoleCommand.getHostName(),
-          hostRoleCommand.getEndTime(),
-          hostRoleCommand.getStatus(),
-          hostRoleCommand.getErrorLog(),
-          hostRoleCommand.getOutputLog(),
-          hostRoleCommand.getStderr(),
-          hostRoleCommand.getStdout()
-      );
-      stateUpdateEventPublisher.publish(namedHostRoleCommandUpdateEvent);
-    }
     updateActiveTasksMap(hostRoleCommandWithReceivedStatus);
     Boolean didAnyStageStatusUpdated = 
updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll);
     // Presumption: If there is no update in any of the running stage's status

Reply via email to