http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java
new file mode 100644
index 0000000..362c117
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+public class HostStatusUpdateEvent extends AmbariEvent {
+
+  private String hostName;
+  private String hostStatus;
+
+  public HostStatusUpdateEvent(String hostName, String hostStatus) {
+    super(AmbariEventType.HOST_STATUS_CHANGE);
+    this.hostName = hostName;
+    this.hostStatus = hostStatus;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  public String getHostStatus() {
+    return hostStatus;
+  }
+
+  public void setHostStatus(String hostStatus) {
+    this.hostStatus = hostStatus;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java
new file mode 100644
index 0000000..a7f9fa6
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.MaintenanceState;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Host info with updated parameter. This update will be sent to all 
subscribed recipients.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class HostUpdateEvent extends AmbariUpdateEvent {
+
+  @JsonProperty("cluster_name")
+  private String clusterName;
+
+  @JsonProperty("host_name")
+  private String hostName;
+
+  @JsonProperty("host_status")
+  private String hostStatus;
+
+  @JsonProperty("host_state")
+  private HostState hostState;
+
+  @JsonProperty("last_heartbeat_time")
+  private Long lastHeartbeatTime;
+
+  @JsonProperty("maintenance_state")
+  private MaintenanceState maintenanceState;
+
+  @JsonProperty("alerts_summary")
+  private AlertSummaryDTO alertsSummary;
+
+  public HostUpdateEvent(String clusterName, String hostName, String 
hostStatus, HostState hostState,
+                         Long lastHeartbeatTime, MaintenanceState 
maintenanceState, AlertSummaryDTO alertsSummary) {
+    super(Type.HOST);
+    this.clusterName = clusterName;
+    this.hostName = hostName;
+    this.hostStatus = hostStatus;
+    this.hostState = hostState;
+    this.lastHeartbeatTime = lastHeartbeatTime;
+    this.maintenanceState = maintenanceState;
+    this.alertsSummary = alertsSummary;
+  }
+
+  public static HostUpdateEvent createHostStatusUpdate(String clusterName, 
String hostName, String hostStatus,
+                                                       Long lastHeartbeatTime) 
{
+    return new HostUpdateEvent(clusterName, hostName, hostStatus, null, 
lastHeartbeatTime, null, null);
+  }
+
+  public static HostUpdateEvent createHostStateUpdate(String clusterName, 
String hostName, HostState hostState,
+                                                       Long lastHeartbeatTime) 
{
+    return new HostUpdateEvent(clusterName, hostName, null, hostState, 
lastHeartbeatTime, null, null);
+  }
+
+  public static HostUpdateEvent createHostMaintenanceStatusUpdate(String 
clusterName, String hostName,
+                                                                  
MaintenanceState maintenanceState,
+                                                                  
AlertSummaryDTO alertsSummary) {
+    return new HostUpdateEvent(clusterName, hostName, null, null, null, 
maintenanceState, alertsSummary);
+  }
+
+  public static HostUpdateEvent createHostAlertsUpdate(String clusterName, 
String hostName,
+                                                       AlertSummaryDTO 
alertsSummary) {
+    return new HostUpdateEvent(clusterName, hostName, null, null, null, null, 
alertsSummary);
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  public String getHostStatus() {
+    return hostStatus;
+  }
+
+  public void setHostStatus(String hostStatus) {
+    this.hostStatus = hostStatus;
+  }
+
+  public Long getLastHeartbeatTime() {
+    return lastHeartbeatTime;
+  }
+
+  public void setLastHeartbeatTime(Long lastHeartbeatTime) {
+    this.lastHeartbeatTime = lastHeartbeatTime;
+  }
+
+  public MaintenanceState getMaintenanceState() {
+    return maintenanceState;
+  }
+
+  public void setMaintenanceState(MaintenanceState maintenanceState) {
+    this.maintenanceState = maintenanceState;
+  }
+
+  public AlertSummaryDTO getAlertsSummary() {
+    return alertsSummary;
+  }
+
+  public void setAlertsSummary(AlertSummaryDTO alertsSummary) {
+    this.alertsSummary = alertsSummary;
+  }
+
+  public HostState getHostState() {
+    return hostState;
+  }
+
+  public void setHostState(HostState hostState) {
+    this.hostState = hostState;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    HostUpdateEvent that = (HostUpdateEvent) o;
+
+    if (clusterName != null ? !clusterName.equals(that.clusterName) : 
that.clusterName != null) return false;
+    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != 
null) return false;
+    if (hostStatus != null ? !hostStatus.equals(that.hostStatus) : 
that.hostStatus != null) return false;
+    if (hostState != that.hostState) return false;
+    if (lastHeartbeatTime != null ? 
!lastHeartbeatTime.equals(that.lastHeartbeatTime) : that.lastHeartbeatTime != 
null)
+      return false;
+    if (maintenanceState != that.maintenanceState) return false;
+    return alertsSummary != null ? alertsSummary.equals(that.alertsSummary) : 
that.alertsSummary == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = clusterName != null ? clusterName.hashCode() : 0;
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (hostStatus != null ? hostStatus.hashCode() : 0);
+    result = 31 * result + (hostState != null ? hostState.hashCode() : 0);
+    result = 31 * result + (lastHeartbeatTime != null ? 
lastHeartbeatTime.hashCode() : 0);
+    result = 31 * result + (maintenanceState != null ? 
maintenanceState.hashCode() : 0);
+    result = 31 * result + (alertsSummary != null ? alertsSummary.hashCode() : 
0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
index 6e7ee90..239b5b8 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,17 +26,37 @@ import 
org.apache.ambari.server.agent.stomp.dto.MetadataCluster;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * Contains update info about metadata for all clusters. This update will be 
sent to all subscribed recipients.
+ */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class MetadataUpdateEvent extends AmbariUpdateEvent implements Hashable 
{
 
+  /**
+   * Id used to send parameters common to all clusters.
+   */
+  private final String AMBARI_LEVEL_CLUSTER_ID = "-1";
+
+  /**
+   * Actual version hash.
+   */
   private String hash;
 
+  /**
+   * Map of metadatas for each cluster by cluster ids.
+   */
   @JsonProperty("clusters")
   private TreeMap<String, MetadataCluster> metadataClusters = new TreeMap<>();
 
-  public MetadataUpdateEvent(TreeMap<String, MetadataCluster> 
metadataClusters) {
+  public MetadataUpdateEvent(TreeMap<String, MetadataCluster> 
metadataClusters, TreeMap<String, String> ambariLevelParams) {
     super(Type.METADATA);
     this.metadataClusters = metadataClusters;
+    if (ambariLevelParams != null) {
+      if (this.metadataClusters == null) {
+        this.metadataClusters = new TreeMap<>();
+      }
+      this.metadataClusters.put(AMBARI_LEVEL_CLUSTER_ID, new 
MetadataCluster(null, new TreeMap<>(), ambariLevelParams));
+    }
   }
 
   public Map<String, MetadataCluster> getMetadataClusters() {
@@ -57,6 +77,21 @@ public class MetadataUpdateEvent extends AmbariUpdateEvent 
implements Hashable {
   }
 
   public static MetadataUpdateEvent emptyUpdate() {
-    return new MetadataUpdateEvent(null);
+    return new MetadataUpdateEvent(null, null);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    MetadataUpdateEvent that = (MetadataUpdateEvent) o;
+
+    return metadataClusters != null ? 
metadataClusters.equals(that.metadataClusters) : that.metadataClusters == null;
+  }
+
+  @Override
+  public int hashCode() {
+    return metadataClusters != null ? metadataClusters.hashCode() : 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
index 3096347..53dc69f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,9 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 
+/**
+ * Single host role command update info. This update will be sent to all 
subscribed recipients.
+ */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class NamedHostRoleCommandUpdateEvent extends AmbariUpdateEvent {
 
@@ -124,4 +127,36 @@ public class NamedHostRoleCommandUpdateEvent extends 
AmbariUpdateEvent {
   public String getDestination() {
     return super.getDestination() + getId();
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    NamedHostRoleCommandUpdateEvent that = (NamedHostRoleCommandUpdateEvent) o;
+
+    if (id != null ? !id.equals(that.id) : that.id != null) return false;
+    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId 
!= null) return false;
+    if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != 
null) return false;
+    if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != 
null) return false;
+    if (status != that.status) return false;
+    if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != 
null) return false;
+    if (outLog != null ? !outLog.equals(that.outLog) : that.outLog != null) 
return false;
+    if (stderr != null ? !stderr.equals(that.stderr) : that.stderr != null) 
return false;
+    return stdout != null ? stdout.equals(that.stdout) : that.stdout == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = id != null ? id.hashCode() : 0;
+    result = 31 * result + (requestId != null ? requestId.hashCode() : 0);
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (endTime != null ? endTime.hashCode() : 0);
+    result = 31 * result + (status != null ? status.hashCode() : 0);
+    result = 31 * result + (errorLog != null ? errorLog.hashCode() : 0);
+    result = 31 * result + (outLog != null ? outLog.hashCode() : 0);
+    result = 31 * result + (stderr != null ? stderr.hashCode() : 0);
+    result = 31 * result + (stdout != null ? stdout.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
index 2cf7b80..4133c62 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,8 +18,9 @@
 
 package org.apache.ambari.server.events;
 
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
@@ -31,6 +32,9 @@ import org.apache.ambari.server.topology.TopologyManager;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * Contains info about request update. This update will be sent to all 
subscribed recipients.
+ */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class RequestUpdateEvent extends AmbariUpdateEvent {
 
@@ -43,7 +47,7 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
   private Long startTime;
 
   @JsonProperty("Tasks")
-  private List<HostRoleCommand> hostRoleCommands = new ArrayList<>();
+  private Set<HostRoleCommand> hostRoleCommands = new HashSet<>();
 
   public RequestUpdateEvent(RequestEntity requestEntity,
                             HostRoleCommandDAO hostRoleCommandDAO,
@@ -67,6 +71,14 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
     }
   }
 
+  public RequestUpdateEvent(Long requestId, HostRoleStatus requestStatus,
+                            Set<HostRoleCommand> hostRoleCommands) {
+    super(Type.REQUEST);
+    this.requestId = requestId;
+    this.requestStatus = requestStatus;
+    this.hostRoleCommands = hostRoleCommands;
+  }
+
   public Long getRequestId() {
     return requestId;
   }
@@ -123,7 +135,15 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
     this.startTime = startTime;
   }
 
-  public class HostRoleCommand {
+  public Set<HostRoleCommand> getHostRoleCommands() {
+    return hostRoleCommands;
+  }
+
+  public void setHostRoleCommands(Set<HostRoleCommand> hostRoleCommands) {
+    this.hostRoleCommands = hostRoleCommands;
+  }
+
+  public static class HostRoleCommand {
     private Long id;
     private Long requestId;
     private HostRoleStatus status;
@@ -167,5 +187,57 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
     public void setHostName(String hostName) {
       this.hostName = hostName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      HostRoleCommand that = (HostRoleCommand) o;
+
+      if (!id.equals(that.id)) return false;
+      if (!requestId.equals(that.requestId)) return false;
+      return hostName.equals(that.hostName);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = id.hashCode();
+      result = 31 * result + requestId.hashCode();
+      result = 31 * result + hostName.hashCode();
+      return result;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RequestUpdateEvent that = (RequestUpdateEvent) o;
+
+    if (clusterName != null ? !clusterName.equals(that.clusterName) : 
that.clusterName != null) return false;
+    if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != 
null) return false;
+    if (requestId != null ? !requestId.equals(that.requestId) : that.requestId 
!= null) return false;
+    if (progressPercent != null ? 
!progressPercent.equals(that.progressPercent) : that.progressPercent != null)
+      return false;
+    if (requestContext != null ? !requestContext.equals(that.requestContext) : 
that.requestContext != null)
+      return false;
+    if (requestStatus != that.requestStatus) return false;
+    if (startTime != null ? !startTime.equals(that.startTime) : that.startTime 
!= null) return false;
+    return hostRoleCommands != null ? 
hostRoleCommands.equals(that.hostRoleCommands) : that.hostRoleCommands == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = clusterName != null ? clusterName.hashCode() : 0;
+    result = 31 * result + (endTime != null ? endTime.hashCode() : 0);
+    result = 31 * result + (requestId != null ? requestId.hashCode() : 0);
+    result = 31 * result + (progressPercent != null ? 
progressPercent.hashCode() : 0);
+    result = 31 * result + (requestContext != null ? requestContext.hashCode() 
: 0);
+    result = 31 * result + (requestStatus != null ? requestStatus.hashCode() : 
0);
+    result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
+    result = 31 * result + (hostRoleCommands != null ? 
hostRoleCommands.hashCode() : 0);
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
new file mode 100644
index 0000000..d39c4ae
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.State;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains info about service update. This update will be sent to all 
subscribed recipients.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ServiceUpdateEvent extends AmbariUpdateEvent {
+
+  @JsonProperty("cluster_name")
+  private String clusterName;
+
+  @JsonProperty("maintenance_state")
+  private MaintenanceState maintenanceState;
+
+  @JsonProperty("service_name")
+  private String serviceName;
+
+  @JsonProperty("state")
+  private State state;
+
+  public ServiceUpdateEvent(String clusterName, MaintenanceState 
maintenanceState, String serviceName, State state) {
+    super(Type.SERVICE);
+    this.clusterName = clusterName;
+    this.maintenanceState = maintenanceState;
+    this.serviceName = serviceName;
+    this.state = state;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public MaintenanceState getMaintenanceState() {
+    return maintenanceState;
+  }
+
+  public void setMaintenanceState(MaintenanceState maintenanceState) {
+    this.maintenanceState = maintenanceState;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public State getState() {
+    return state;
+  }
+
+  public void setState(State state) {
+    this.state = state;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ServiceUpdateEvent that = (ServiceUpdateEvent) o;
+
+    if (clusterName != null ? !clusterName.equals(that.clusterName) : 
that.clusterName != null) return false;
+    if (maintenanceState != that.maintenanceState) return false;
+    if (serviceName != null ? !serviceName.equals(that.serviceName) : 
that.serviceName != null) return false;
+    return state == that.state;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = clusterName != null ? clusterName.hashCode() : 0;
+    result = 31 * result + (maintenanceState != null ? 
maintenanceState.hashCode() : 0);
+    result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0);
+    result = 31 * result + (state != null ? state.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java
new file mode 100644
index 0000000..1fa4e6c
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+import java.util.TreeMap;
+
+import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+/**
+ * Contains info about clusters topology update. This update will be sent to 
all subscribed recipients.
+ * Is used to messaging to agents.
+ */
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class TopologyAgentUpdateEvent extends TopologyUpdateEvent {
+  public TopologyAgentUpdateEvent(TreeMap<String, TopologyCluster> clusters, 
String hash, EventType eventType) {
+    super(Type.AGENT_TOPOLOGY, clusters, hash, eventType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
index cfab422..1b5b90b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.events;
 
+import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.ambari.server.agent.stomp.dto.Hashable;
@@ -25,18 +26,38 @@ import 
org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+/**
+ * Contains info about clusters topology update. This update will be sent to 
all subscribed recipients.
+ * Is used to messaging to UI.
+ */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class TopologyUpdateEvent extends AmbariUpdateEvent implements Hashable 
{
+
+  /**
+   * Map of clusters topologies by cluster ids.
+   */
   @JsonProperty("clusters")
   private TreeMap<String, TopologyCluster> clusters;
 
+  /**
+   * Actual version hash.
+   */
   private String hash;
 
+  /**
+   * Type of update, is used to differ full current topology (CREATE), adding 
new or update existing topology
+   * elements (UPDATE) and removing existing topology elements (DELETE).
+   */
   private EventType eventType;
 
   public TopologyUpdateEvent(TreeMap<String, TopologyCluster> clusters, 
EventType eventType) {
-    super(Type.TOPOLOGY);
+    this(Type.UI_TOPOLOGY, clusters, null, eventType);
+  }
+
+  public TopologyUpdateEvent(Type type, TreeMap<String, TopologyCluster> 
clusters, String hash, EventType eventType) {
+    super(type);
     this.clusters = clusters;
+    this.hash = hash;
     this.eventType = eventType;
   }
 
@@ -44,6 +65,16 @@ public class TopologyUpdateEvent extends AmbariUpdateEvent 
implements Hashable {
     return clusters;
   }
 
+  public TopologyUpdateEvent deepCopy() {
+    TreeMap<String, TopologyCluster> copiedClusters = new TreeMap<>();
+    for (Map.Entry<String, TopologyCluster> topologyClusterEntry : 
getClusters().entrySet()) {
+      copiedClusters.put(topologyClusterEntry.getKey(), 
topologyClusterEntry.getValue().deepCopyCluster());
+    }
+    TopologyUpdateEvent copiedEvent = new TopologyUpdateEvent(copiedClusters, 
getEventType());
+    copiedEvent.setHash(getHash());
+    return copiedEvent;
+  }
+
   public void setClusters(TreeMap<String, TopologyCluster> clusters) {
     this.clusters = clusters;
   }
@@ -73,4 +104,22 @@ public class TopologyUpdateEvent extends AmbariUpdateEvent 
implements Hashable {
     DELETE,
     UPDATE
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TopologyUpdateEvent that = (TopologyUpdateEvent) o;
+
+    if (clusters != null ? !clusters.equals(that.clusters) : that.clusters != 
null) return false;
+    return eventType == that.eventType;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = clusters != null ? clusters.hashCode() : 0;
+    result = 31 * result + (eventType != null ? eventType.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index c34b95a..23a547f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -151,8 +151,7 @@ public class AlertReceivedListener {
     List<AlertCurrentEntity> toCreateHistoryAndMerge = new ArrayList<>();
 
     List<AlertEvent> alertEvents = new ArrayList<>(20);
-    List<Alert> updatedAlerts = new ArrayList<>();
-    Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary> summaries 
= new HashMap<>();
+    Map<Long, Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary>> 
alertUpdates = new HashMap<>();
 
     for (Alert alert : alerts) {
       // jobs that were running when a service/component/host was changed
@@ -343,10 +342,15 @@ public class AlertReceivedListener {
 
         // create the event to fire later
         alertEvents.add(new AlertStateChangeEvent(clusterId, alert, current, 
oldState, oldFirmness));
-        updatedAlerts.add(alert);
 
         // create alert update to fire event to UI
         MaintenanceState maintenanceState = getMaintenanceState(alert, 
clusterId);
+
+        if (!alertUpdates.containsKey(clusterId)) {
+          alertUpdates.put(clusterId, new HashMap<>());
+        }
+        Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary> 
summaries = alertUpdates.get(clusterId);
+
         AlertSummaryGroupedRenderer.updateSummary(summaries, 
definition.getDefinitionId(),
             definition.getDefinitionName(), alertState, alert.getTimestamp(), 
maintenanceState, alert.getText());
       }
@@ -360,8 +364,8 @@ public class AlertReceivedListener {
     for (AlertEvent eventToFire : alertEvents) {
       m_alertEventPublisher.publish(eventToFire);
     }
-    if (!summaries.isEmpty()) {
-      stateUpdateEventPublisher.publish(new AlertUpdateEvent(summaries));
+    if (!alertUpdates.isEmpty()) {
+      stateUpdateEventPublisher.publish(new AlertUpdateEvent(alertUpdates));
     }
   }
 
@@ -612,7 +616,7 @@ public class AlertReceivedListener {
    *          the definition to read any repeat tolerance overrides from.
    * @param state
    *          the state of the {@link AlertCurrentEntity}.
-   * @param occurrences
+   * @param the
    *          occurrences of the alert in the current state (used for
    *          calculation firmness when moving between non-OK states)
    * @return

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java
new file mode 100644
index 0000000..6cb831f
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.hosts;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.HostStateUpdateEvent;
+import org.apache.ambari.server.events.HostStatusUpdateEvent;
+import org.apache.ambari.server.events.HostUpdateEvent;
+import org.apache.ambari.server.events.InitialAlertEvent;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+@EagerSingleton
+public class HostUpdateListener {
+
+  private Map<Long, Map<String, HostUpdateEvent>> hosts = new HashMap<>();
+
+  @Inject
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+  @Inject
+  private ServiceDesiredStateDAO serviceDesiredStateDAO;
+
+  @Inject
+  private AlertsDAO alertsDAO;
+
+  @Inject
+  private Provider<Clusters> m_clusters;
+
+  @Inject
+  public HostUpdateListener(AmbariEventPublisher ambariEventPublisher, 
AlertEventPublisher m_alertEventPublisher) {
+    ambariEventPublisher.register(this);
+    m_alertEventPublisher.register(this);
+  }
+
+  @Subscribe
+  public void onHostStatusUpdate(HostStatusUpdateEvent event) throws 
AmbariException {
+    String hostName = event.getHostName();
+    Long lastHeartbeatTime = 
m_clusters.get().getHost(hostName).getLastHeartbeatTime();
+
+    for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) {
+      Long clusterId = cluster.getClusterId();
+
+      // retrieve state from cache
+      HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, 
hostName);
+
+      if (hostUpdateEvent.getHostStatus().equals(event.getHostStatus())) {
+        continue;
+      } else {
+        hostUpdateEvent.setHostStatus(event.getHostStatus());
+      }
+      hostUpdateEvent.setLastHeartbeatTime(lastHeartbeatTime);
+
+      
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostStatusUpdate(hostUpdateEvent.getClusterName(),
+          hostUpdateEvent.getHostName(),
+          hostUpdateEvent.getHostStatus(),
+          hostUpdateEvent.getLastHeartbeatTime()));
+    }
+  }
+
+  @Subscribe
+  public void onHostStateUpdate(HostStateUpdateEvent event) throws 
AmbariException {
+    String hostName = event.getHostName();
+    Long lastHeartbeatTime = 
m_clusters.get().getHost(hostName).getLastHeartbeatTime();
+
+    for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) {
+      Long clusterId = cluster.getClusterId();
+
+      // retrieve state from cache
+      HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, 
hostName);
+
+      if (hostUpdateEvent.getHostState().equals(event.getHostState())) {
+        continue;
+      } else {
+        hostUpdateEvent.setHostState(event.getHostState());
+      }
+      hostUpdateEvent.setLastHeartbeatTime(lastHeartbeatTime);
+
+      
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostStateUpdate(hostUpdateEvent.getClusterName(),
+          hostUpdateEvent.getHostName(),
+          hostUpdateEvent.getHostState(),
+          hostUpdateEvent.getLastHeartbeatTime()));
+    }
+  }
+
+  @Subscribe
+  public void onAlertsHostUpdate(AlertEvent event) throws AmbariException {
+    String hostName;
+    if (!(event instanceof AlertStateChangeEvent) && !(event instanceof 
InitialAlertEvent)) {
+      return;
+    } else if (event instanceof AlertStateChangeEvent) {
+      hostName = ((AlertStateChangeEvent) 
event).getNewHistoricalEntry().getHostName();
+    } else {
+      hostName = ((InitialAlertEvent) 
event).getNewHistoricalEntry().getHostName();
+    }
+    if (StringUtils.isEmpty(hostName)) {
+      return;
+    }
+
+    Long clusterId = event.getClusterId();
+
+    // retrieve state from cache
+    HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, 
hostName);
+
+    // change alerts counters
+    AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, 
hostName);
+    if (hostUpdateEvent.getAlertsSummary().equals(summary)) {
+      return;
+    }
+    hostUpdateEvent.setAlertsSummary(summary);
+
+    
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(),
+        hostName, summary));
+  }
+
+  @Subscribe
+  public void onMaintenanceStateUpdate(MaintenanceModeEvent event) throws 
AmbariException {
+    Long clusterId = event.getClusterId();
+
+    if (event.getHost() != null || event.getServiceComponentHost() != null) {
+      String hostName = event.getHost().getHostName();
+
+      HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, 
hostName);
+
+      AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, 
hostName);
+
+      if (hostUpdateEvent.getAlertsSummary().equals(summary)) {
+        return;
+      }
+      hostUpdateEvent.setAlertsSummary(summary);
+      if (event.getHost() != null) {
+        MaintenanceState maintenanceState = event.getMaintenanceState();
+        hostUpdateEvent.setMaintenanceState(maintenanceState);
+
+        
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostMaintenanceStatusUpdate(hostUpdateEvent.getClusterName(),
+            hostName, maintenanceState, summary));
+      } else {
+        
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(),
+            hostName, summary));
+      }
+    } else if (event.getService()!= null) {
+      String serviceName = event.getService().getName();
+      for (String hostName : 
m_clusters.get().getCluster(clusterId).getService(serviceName).getServiceHosts())
 {
+        HostUpdateEvent hostUpdateEvent = 
retrieveHostUpdateFromCache(clusterId, hostName);
+
+        AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, 
hostName);
+
+        if (hostUpdateEvent.getAlertsSummary().equals(summary)) {
+          continue;
+        }
+        hostUpdateEvent.setAlertsSummary(summary);
+
+        
stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(),
+            hostName, summary));
+      }
+    }
+  }
+
+  private HostUpdateEvent retrieveHostUpdateFromCache(Long clusterId, String 
hostName) throws AmbariException {
+    HostUpdateEvent hostUpdateEvent;
+    if (hosts.containsKey(clusterId) && 
hosts.get(clusterId).containsKey(hostName)) {
+      hostUpdateEvent = hosts.get(clusterId).get(hostName);
+    } else {
+      hostUpdateEvent = createHostUpdateEvent(clusterId, hostName);
+      if (!hosts.containsKey(clusterId)) {
+        hosts.put(clusterId, new HashMap<>());
+      }
+      hosts.get(clusterId).put(hostName, hostUpdateEvent);
+    }
+
+    return hostUpdateEvent;
+  }
+
+  private HostUpdateEvent createHostUpdateEvent(Long clusterId, String 
hostName) throws AmbariException {
+    String clusterName = 
m_clusters.get().getClusterById(clusterId).getClusterName();
+    Host host = m_clusters.get().getHost(hostName);
+
+    AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, 
hostName);
+
+    return new HostUpdateEvent(clusterName, hostName, host.getStatus(), 
host.getState(),  host.getLastHeartbeatTime(),
+        host.getMaintenanceState(clusterId), summary);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
index 940dc76..27af717 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,11 +19,17 @@
 
 package org.apache.ambari.server.events.listeners.requests;
 
+import org.apache.ambari.server.HostNotRegisteredException;
+import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.events.AmbariHostUpdateEvent;
 import org.apache.ambari.server.events.AmbariUpdateEvent;
 import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
+import org.springframework.messaging.simp.SimpMessageType;
 import org.springframework.messaging.simp.SimpMessagingTemplate;
 
 import com.google.common.eventbus.AllowConcurrentEvents;
@@ -34,19 +40,38 @@ public class StateUpdateListener {
 
   private final static Logger LOG = 
LoggerFactory.getLogger(StateUpdateListener.class);
 
+  private final AgentSessionManager agentSessionManager;
+
   @Autowired
   SimpMessagingTemplate simpMessagingTemplate;
 
   public StateUpdateListener(Injector injector) {
     StateUpdateEventPublisher stateUpdateEventPublisher =
       injector.getInstance(StateUpdateEventPublisher.class);
+    agentSessionManager = injector.getInstance(AgentSessionManager.class);
     stateUpdateEventPublisher.register(this);
   }
 
   @Subscribe
   @AllowConcurrentEvents
-  public void onUpdateEvent(AmbariUpdateEvent event) {
-    LOG.debug("Received status update event {}", event.toString());
-    simpMessagingTemplate.convertAndSend(event.getDestination(), event);
+  public void onUpdateEvent(AmbariUpdateEvent event) throws 
HostNotRegisteredException {
+    if (event instanceof AmbariHostUpdateEvent) {
+      AmbariHostUpdateEvent ambariHostUpdateEvent = (AmbariHostUpdateEvent) 
event;
+      String sessionId = 
agentSessionManager.getSessionId(ambariHostUpdateEvent.getHostName());
+      LOG.debug("Received status update event {} for host ()", 
ambariHostUpdateEvent.toString(),
+          ambariHostUpdateEvent.getHostName());
+      simpMessagingTemplate.convertAndSendToUser(sessionId, 
ambariHostUpdateEvent.getDestination(),
+          ambariHostUpdateEvent, createHeaders(sessionId));
+    } else {
+      LOG.debug("Received status update event {}", event.toString());
+      simpMessagingTemplate.convertAndSend(event.getDestination(), event);
+    }
+  }
+
+  private MessageHeaders createHeaders(String sessionId) {
+    SimpMessageHeaderAccessor headerAccessor = 
SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
+    headerAccessor.setSessionId(sessionId);
+    headerAccessor.setLeaveMutable(true);
+    return headerAccessor.getMessageHeaders();
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
new file mode 100644
index 0000000..24c8166
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.services;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import 
org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory;
+import 
org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState;
+import org.apache.ambari.server.events.HostComponentUpdate;
+import org.apache.ambari.server.events.HostComponentsUpdateEvent;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.ServiceUpdateEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
+import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.State;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+@Singleton
+@EagerSingleton
+public class ServiceUpdateListener {
+  private Map<Long, Map<String, State>> states = new HashMap<>();
+
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+  @Inject
+  private ServiceDesiredStateDAO serviceDesiredStateDAO;
+
+  @Inject
+  private Provider<Clusters> m_clusters;
+
+  @Inject
+  public ServiceUpdateListener(StateUpdateEventPublisher 
stateUpdateEventPublisher, AmbariEventPublisher ambariEventPublisher) {
+    stateUpdateEventPublisher.register(this);
+    ambariEventPublisher.register(this);
+
+    this.stateUpdateEventPublisher = stateUpdateEventPublisher;
+  }
+
+  @Subscribe
+  public void onHostComponentUpdate(HostComponentsUpdateEvent event) throws 
AmbariException {
+    for (HostComponentUpdate hostComponentUpdate : 
event.getHostComponentUpdates()) {
+      Long clusterId = hostComponentUpdate.getClusterId();
+      String clusterName = 
m_clusters.get().getClusterById(clusterId).getClusterName();
+      String serviceName = hostComponentUpdate.getServiceName();
+
+      ServiceCalculatedState serviceCalculatedState = 
ServiceCalculatedStateFactory.getServiceStateProvider(serviceName);
+      State serviceState = serviceCalculatedState.getState(clusterName, 
serviceName);
+
+      // retrieve state from cache
+      if (states.containsKey(clusterId) && 
states.get(clusterId).containsKey(serviceName) && 
states.get(clusterId).get(serviceName).equals(serviceState)) {
+        continue;
+      }
+      if (!states.containsKey(clusterId)) {
+        states.put(clusterId, new HashMap<>());
+      }
+      states.get(clusterId).put(serviceName, serviceState);
+      stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, 
null, serviceName, serviceState));
+    }
+  }
+
+  @Subscribe
+  public void onMaintenanceStateUpdate(MaintenanceModeEvent event) throws 
AmbariException {
+    if (event.getService() == null) {
+      return;
+    }
+    Long clusterId = event.getClusterId();
+    String clusterName = 
m_clusters.get().getClusterById(clusterId).getClusterName();
+    String serviceName = event.getService().getName();
+
+    MaintenanceState maintenanceState = event.getMaintenanceState();
+
+    stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, 
maintenanceState, serviceName, null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 6bb3b69..c40969e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -147,6 +147,8 @@ public class TaskStatusListener {
     List<HostRoleCommand>  hostRoleCommandWithReceivedStatus =  new 
ArrayList<>();
     Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>();
     Set<Long> requestIdsWithReceivedTaskStatus =  new HashSet<>();
+    Set<RequestUpdateEvent> requestsToPublish = new HashSet<>();
+
     for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) {
       Long reportedTaskId = hostRoleCommand.getTaskId();
       HostRoleCommand activeTask =  activeTasksMap.get(reportedTaskId);
@@ -159,6 +161,16 @@ public class TaskStatusListener {
         stageEntityPK.setStageId(hostRoleCommand.getStageId());
         stagesWithReceivedTaskStatus.add(stageEntityPK);
         requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
+
+        if 
(!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus()))
 {
+          Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new 
HashSet<>();
+          hostRoleCommands.add(new 
RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
+              hostRoleCommand.getRequestId(),
+              hostRoleCommand.getStatus(),
+              hostRoleCommand.getHostName()));
+          requestsToPublish.add(new 
RequestUpdateEvent(hostRoleCommand.getRequestId(),
+              
activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), 
hostRoleCommands));
+        }
       }
     }
 
@@ -182,7 +194,9 @@ public class TaskStatusListener {
     if (didAnyStageStatusUpdated) {
       updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, 
stagesWithReceivedTaskStatus);
     }
-
+    for (RequestUpdateEvent requestToPublish : requestsToPublish) {
+      stateUpdateEventPublisher.publish(requestToPublish);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 22d7f2e..a35957d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -209,7 +209,7 @@ public class StackVersionListener {
    */
   private void processComponentVersionChange(Cluster cluster, ServiceComponent 
sc,
                                              ServiceComponentHost sch,
-                                             String newVersion) {
+                                             String newVersion) throws 
AmbariException {
     String desiredVersion = sc.getDesiredVersion();
     UpgradeState upgradeState = sch.getUpgradeState();
     if (upgradeState == UpgradeState.IN_PROGRESS) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
new file mode 100644
index 0000000..e09d5ca
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.AgentCommand;
+import org.apache.ambari.server.agent.CancelCommand;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.agent.stomp.dto.ExecutionCommandsCluster;
+import org.apache.ambari.server.events.ExecutionCommandEvent;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import 
org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReader;
+import 
org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReaderFactory;
+import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class AgentCommandsPublisher {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AgentCommandsPublisher.class);
+
+  /**
+   * KerberosIdentityDataFileReaderFactory used to create 
KerberosIdentityDataFileReader instances
+   */
+  @Inject
+  private KerberosIdentityDataFileReaderFactory 
kerberosIdentityDataFileReaderFactory;
+
+  @Inject
+  private Clusters clusters;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+  public void sendAgentCommand(Multimap<String, AgentCommand> agentCommands) 
throws AmbariException {
+    if (agentCommands != null && !agentCommands.isEmpty()) {
+      Map<String, TreeMap<String, ExecutionCommandsCluster>> 
executionCommandsClusters = new TreeMap<>();
+      for (Map.Entry<String, AgentCommand> acHostEntry : 
agentCommands.entries()) {
+        String hostName = acHostEntry.getKey();
+        AgentCommand ac = acHostEntry.getValue();
+        populateExecutionCommandsClusters(executionCommandsClusters, hostName, 
ac);
+      }
+      for (Map.Entry<String, TreeMap<String, ExecutionCommandsCluster>> 
hostEntry : executionCommandsClusters.entrySet()) {
+        String hostName = hostEntry.getKey();
+        ExecutionCommandEvent executionCommandEvent = new 
ExecutionCommandEvent(hostEntry.getValue());
+        executionCommandEvent.setHostName(hostName);
+        stateUpdateEventPublisher.publish(executionCommandEvent);
+      }
+    }
+  }
+
+  public void sendAgentCommand(String hostName, AgentCommand agentCommand) 
throws AmbariException {
+    Multimap<String, AgentCommand> agentCommands = ArrayListMultimap.create();
+    agentCommands.put(hostName, agentCommand);
+    sendAgentCommand(agentCommands);
+  }
+
+  private void populateExecutionCommandsClusters(Map<String, TreeMap<String, 
ExecutionCommandsCluster>> executionCommandsClusters,
+                                            String hostName, AgentCommand ac) 
throws AmbariException {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac));
+      }
+    } catch (Exception e) {
+      throw new AmbariException("Could not get jaxb string for command", e);
+    }
+    switch (ac.getCommandType()) {
+      case BACKGROUND_EXECUTION_COMMAND:
+      case EXECUTION_COMMAND: {
+        ExecutionCommand ec = (ExecutionCommand) ac;
+        LOG.info("AgentCommandsPublisher.sendCommands: sending 
ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task 
ID {}",
+            ec.getHostname(), ec.getRole(), ec.getRoleCommand(), 
ec.getCommandId(), ec.getTaskId());
+        Map<String, String> hlp = ec.getCommandParams();
+        if (hlp != null) {
+          String customCommand = hlp.get("custom_command");
+          if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || 
"REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
+            LOG.info(String.format("%s called", customCommand));
+            try {
+              injectKeytab(ec, customCommand, hostName);
+            } catch (IOException e) {
+              throw new AmbariException("Could not inject keytab into 
command", e);
+            }
+          }
+        }
+        String clusterName = ec.getClusterName();
+        String clusterId = "-1";
+        if (clusterName != null) {
+          clusterId = 
Long.toString(clusters.getCluster(clusterName).getClusterId());
+        }
+        ec.setClusterId(clusterId);
+        prepareExecutionCommandsClusters(executionCommandsClusters, hostName, 
clusterId);
+        
executionCommandsClusters.get(hostName).get(clusterId).getExecutionCommands().add((ExecutionCommand)
 ac);
+        break;
+      }
+      case CANCEL_COMMAND: {
+        CancelCommand cc = (CancelCommand) ac;
+        String clusterId = 
Long.toString(hostRoleCommandDAO.findByPK(cc.getTargetTaskId()).getStage().getClusterId());
+        prepareExecutionCommandsClusters(executionCommandsClusters, hostName, 
clusterId);
+        
executionCommandsClusters.get(hostName).get(clusterId).getCancelCommands().add(cc);
+        break;
+      }
+      default:
+        LOG.error("There is no action for agent command ="
+            + ac.getCommandType().name());
+    }
+  }
+
+  private void prepareExecutionCommandsClusters(Map<String, TreeMap<String, 
ExecutionCommandsCluster>> executionCommandsClusters,
+                                                String hostName, String 
clusterId) {
+    if (!executionCommandsClusters.containsKey(hostName)) {
+      executionCommandsClusters.put(hostName, new TreeMap<>());
+    }
+    if (!executionCommandsClusters.get(hostName).containsKey(clusterId)) {
+      executionCommandsClusters.get(hostName).put(clusterId, new 
ExecutionCommandsCluster(new ArrayList<>(),
+          new ArrayList<>()));
+    }
+  }
+
+  /**
+   * Insert Kerberos keytab details into the ExecutionCommand for the 
SET_KEYTAB custom command if
+   * any keytab details and associated data exists for the target host.
+   *
+   * @param ec the ExecutionCommand to update
+   * @param command a name of the relevant keytab command
+   * @param targetHost a name of the host the relevant command is destined for
+   * @throws AmbariException
+   */
+  void injectKeytab(ExecutionCommand ec, String command, String targetHost) 
throws AmbariException {
+    String dataDir = 
ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY);
+
+    if (dataDir != null) {
+      KerberosIdentityDataFileReader reader = null;
+      List<Map<String, String>> kcp = ec.getKerberosCommandParams();
+
+      try {
+        reader = 
kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new 
File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME));
+
+        for (Map<String, String> record : reader) {
+          String hostName = 
record.get(KerberosIdentityDataFileReader.HOSTNAME);
+
+          if (targetHost.equalsIgnoreCase(hostName)) {
+
+            if ("SET_KEYTAB".equalsIgnoreCase(command)) {
+              String keytabFilePath = 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH);
+
+              if (keytabFilePath != null) {
+
+                String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath);
+                File keytabFile = new File(dataDir + File.separator + hostName 
+ File.separator + sha1Keytab);
+
+                if (keytabFile.canRead()) {
+                  Map<String, String> keytabMap = new HashMap<>();
+                  String principal = 
record.get(KerberosIdentityDataFileReader.PRINCIPAL);
+                  String isService = 
record.get(KerberosIdentityDataFileReader.SERVICE);
+
+                  keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, 
hostName);
+                  keytabMap.put(KerberosIdentityDataFileReader.SERVICE, 
isService);
+                  keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, 
record.get(KerberosIdentityDataFileReader.COMPONENT));
+                  keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, 
principal);
+                  
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath);
+                  
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME));
+                  
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS));
+                  
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME));
+                  
keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS));
+
+                  BufferedInputStream bufferedIn = new BufferedInputStream(new 
FileInputStream(keytabFile));
+                  byte[] keytabContent = null;
+                  try {
+                    keytabContent = IOUtils.toByteArray(bufferedIn);
+                  } finally {
+                    bufferedIn.close();
+                  }
+                  String keytabContentBase64 = 
Base64.encodeBase64String(keytabContent);
+                  keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, 
keytabContentBase64);
+
+                  kcp.add(keytabMap);
+                }
+              }
+            } else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) {
+              Map<String, String> keytabMap = new HashMap<>();
+
+              keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName);
+              keytabMap.put(KerberosIdentityDataFileReader.SERVICE, 
record.get(KerberosIdentityDataFileReader.SERVICE));
+              keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, 
record.get(KerberosIdentityDataFileReader.COMPONENT));
+              keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, 
record.get(KerberosIdentityDataFileReader.PRINCIPAL));
+              keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, 
record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH));
+
+              kcp.add(keytabMap);
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new AmbariException("Could not inject keytabs to enable 
kerberos");
+      } finally {
+        if (reader != null) {
+          try {
+            reader.close();
+          } catch (Throwable t) {
+            // ignored
+          }
+        }
+      }
+
+      ec.setKerberosCommandParams(kcp);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
new file mode 100644
index 0000000..e32e715
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.ambari.server.events.HostComponentUpdate;
+import org.apache.ambari.server.events.HostComponentsUpdateEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Singleton;
+
+@Singleton
+public class HostComponentUpdateEventPublisher {
+
+  private final Long TIMEOUT = 1000L;
+  private AtomicLong previousTime = new AtomicLong(0);
+  private AtomicBoolean collecting = new AtomicBoolean(false);
+  private ConcurrentLinkedQueue<HostComponentUpdate> buffer = new 
ConcurrentLinkedQueue<>();
+  private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
+
+  public void publish(HostComponentsUpdateEvent event, EventBus m_eventBus) {
+    Long eventTime = System.currentTimeMillis();
+    if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) {
+      buffer.addAll(event.getHostComponentUpdates());
+      collecting.set(true);
+      scheduledExecutorService.schedule(new 
HostComponentsEventRunnable(m_eventBus),
+          TIMEOUT, TimeUnit.MILLISECONDS);
+    } else if (collecting.get()) {
+      buffer.addAll(event.getHostComponentUpdates());
+    } else {
+      //TODO add logging and metrics posting
+      previousTime.set(eventTime);
+      m_eventBus.post(event);
+    }
+  }
+
+  private class HostComponentsEventRunnable implements Runnable {
+
+    private final EventBus eventBus;
+
+    public HostComponentsEventRunnable(EventBus eventBus) {
+      this.eventBus = eventBus;
+    }
+
+    @Override
+    public void run() {
+      List<HostComponentUpdate> hostComponentUpdates = new ArrayList<>();
+      while (!buffer.isEmpty()) {
+        hostComponentUpdates.add(buffer.poll());
+      }
+      HostComponentsUpdateEvent resultEvents = new 
HostComponentsUpdateEvent(hostComponentUpdates);
+      //TODO add logging and metrics posting
+      eventBus.post(resultEvents);
+      previousTime.set(System.currentTimeMillis());
+      collecting.set(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
new file mode 100644
index 0000000..3ddf00c
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.events.RequestUpdateEvent;
+import org.apache.ambari.server.orm.dao.ClusterDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.topology.TopologyManager;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class RequestUpdateEventPublisher {
+
+  private final Long TIMEOUT = 1000L;
+  private ConcurrentHashMap<Long, Long> previousTime = new 
ConcurrentHashMap<>();
+  private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new 
ConcurrentHashMap<>();
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private TopologyManager topologyManager;
+
+  @Inject
+  private RequestDAO requestDAO;
+
+  @Inject
+  private ClusterDAO clusterDAO;
+
+  public void publish(RequestUpdateEvent event, EventBus m_eventBus) {
+    Long eventTime = System.currentTimeMillis();
+    Long requestId = event.getRequestId();
+    if (!previousTime.containsKey(requestId)) {
+      previousTime.put(requestId, 0L);
+    }
+    if (eventTime - previousTime.get(requestId) <= TIMEOUT && 
!buffer.containsKey(requestId)) {
+      buffer.put(event.getRequestId(), event);
+      Executors.newScheduledThreadPool(1).schedule(new 
RequestEventRunnable(requestId, m_eventBus),
+          TIMEOUT, TimeUnit.MILLISECONDS);
+    } else if (buffer.containsKey(requestId)) {
+      //merge available buffer content with arrived
+      buffer.get(requestId).setEndTime(event.getEndTime());
+      buffer.get(requestId).setRequestStatus(event.getRequestStatus());
+      buffer.get(requestId).setRequestContext(event.getRequestContext());
+      
buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands());
+      
buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands());
+    } else {
+      previousTime.put(requestId, eventTime);
+      //TODO add logging and metrics posting
+      m_eventBus.post(fillRequest(event));
+    }
+  }
+
+  private RequestUpdateEvent fillRequest(RequestUpdateEvent event) {
+    event.setProgressPercent(
+        CalculatedStatus.statusFromRequest(hostRoleCommandDAO, 
topologyManager, event.getRequestId()).getPercent());
+    if (event.getEndTime() == null || event.getStartTime() == null || 
event.getClusterName() == null
+        || event.getRequestContext() == null) {
+      RequestEntity requestEntity = requestDAO.findByPK(event.getRequestId());
+      event.setStartTime(requestEntity.getStartTime());
+      event.setEndTime(requestEntity.getEndTime());
+      if (requestEntity.getClusterId() != -1) {
+        
event.setClusterName(clusterDAO.findById(requestEntity.getClusterId()).getClusterName());
+      }
+      event.setRequestContext(requestEntity.getRequestContext());
+      event.setRequestStatus(requestEntity.getStatus());
+    }
+    return event;
+  }
+
+  private class RequestEventRunnable implements Runnable {
+
+    private final long requestId;
+    private final EventBus eventBus;
+
+    public RequestEventRunnable(long requestId, EventBus eventBus) {
+      this.requestId = requestId;
+      this.eventBus = eventBus;
+    }
+
+    @Override
+    public void run() {
+      RequestUpdateEvent resultEvent = buffer.get(requestId);
+      //TODO add logging and metrics posting
+      eventBus.post(fillRequest(resultEvent));
+      buffer.remove(requestId);
+      previousTime.remove(requestId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
index 62fe44c..53738f4 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java
@@ -20,9 +20,12 @@ package org.apache.ambari.server.events.publishers;
 import java.util.concurrent.Executors;
 
 import org.apache.ambari.server.events.AmbariUpdateEvent;
+import org.apache.ambari.server.events.HostComponentsUpdateEvent;
+import org.apache.ambari.server.events.RequestUpdateEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
@@ -30,13 +33,25 @@ public final class StateUpdateEventPublisher {
 
   private final EventBus m_eventBus;
 
+  @Inject
+  private RequestUpdateEventPublisher requestUpdateEventPublisher;
+
+  @Inject
+  private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher;
+
   public StateUpdateEventPublisher() {
     m_eventBus = new AsyncEventBus("ambari-update-bus",
       Executors.newSingleThreadExecutor());
   }
 
   public void publish(AmbariUpdateEvent event) {
-    m_eventBus.post(event);
+    if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) {
+      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, 
m_eventBus);
+    } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) {
+      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) 
event, m_eventBus);
+    } else {
+      m_eventBus.post(event);
+    }
   }
 
   public void register(Object object) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
index c3e3a9f..25bc813 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java
@@ -30,7 +30,10 @@ import 
org.apache.ambari.server.controller.internal.AlertDefinitionResourceProvi
 import org.apache.ambari.server.events.AlertDefinitionChangedEvent;
 import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
 import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
+import org.apache.ambari.server.events.AlertDefinitionUpdateHolder;
+import org.apache.ambari.server.events.AlertDefinitionsUpdateEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
 import org.apache.ambari.server.orm.RequiresSession;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.orm.entities.AlertGroupEntity;
@@ -99,6 +102,12 @@ public class AlertDefinitionDAO {
   @Inject
   private AlertDefinitionFactory alertDefinitionFactory;
 
+  @Inject
+  private AlertDefinitionUpdateHolder alertDefinitionUpdateHolder;
+
+  @Inject
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
   /**
    * Gets an alert definition with the specified ID.
    *
@@ -341,6 +350,9 @@ public class AlertDefinitionDAO {
       AlertDefinitionRegistrationEvent event = new 
AlertDefinitionRegistrationEvent(
           alertDefinition.getClusterId(), coerced);
 
+      stateUpdateEventPublisher.publish(new 
AlertDefinitionsUpdateEvent(coerced,
+          alertDefinition.getRepeatTolerance(), 
Boolean.valueOf(alertDefinition.isRepeatToleranceEnabled())));
+
       eventPublisher.publish(event);
     } else {
       LOG.warn("Unable to broadcast alert registration event for {}",
@@ -380,6 +392,9 @@ public class AlertDefinitionDAO {
 
     eventPublisher.publish(event);
 
+    alertDefinitionUpdateHolder.updateIfNeeded(new 
AlertDefinitionsUpdateEvent(definition,
+        alertDefinition.getRepeatTolerance(), 
Boolean.valueOf(alertDefinition.isRepeatToleranceEnabled())));
+
     return entity;
   }
 
@@ -426,6 +441,8 @@ public class AlertDefinitionDAO {
                 alertDefinition.getClusterId(), coerced);
 
         eventPublisher.publish(event);
+
+        stateUpdateEventPublisher.publish(new 
AlertDefinitionsUpdateEvent(alertDefinition.getDefinitionId()));
       } else {
         LOG.warn("Unable to broadcast alert removal event for {}",
                 alertDefinition.getDefinitionName());

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
index 0023def..5758c1a 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
@@ -18,6 +18,10 @@
 package org.apache.ambari.server.orm.dao;
 
 import org.apache.ambari.server.state.AlertState;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
  * Used to return alert summary data out of the database. Alerts that are in
@@ -25,10 +29,19 @@ import org.apache.ambari.server.state.AlertState;
  */
 public class AlertSummaryDTO {
 
+  @JsonProperty("OK")
   private int okCount;
+
+  @JsonProperty("WARNING")
   private int warningCount;
+
+  @JsonProperty("CRITICAL")
   private int criticalCount;
+
+  @JsonProperty("UNKNOWN")
   private int unknownCount;
+
+  @JsonProperty("MAINTENANCE")
   private int maintenanceCount;
 
   /**
@@ -128,4 +141,32 @@ public class AlertSummaryDTO {
   public void setMaintenanceCount(int maintenanceCount) {
     this.maintenanceCount = maintenanceCount;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+
+    if (!(o instanceof AlertSummaryDTO)) return false;
+
+    AlertSummaryDTO that = (AlertSummaryDTO) o;
+
+    return new EqualsBuilder()
+        .append(okCount, that.okCount)
+        .append(warningCount, that.warningCount)
+        .append(criticalCount, that.criticalCount)
+        .append(unknownCount, that.unknownCount)
+        .append(maintenanceCount, that.maintenanceCount)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+        .append(okCount)
+        .append(warningCount)
+        .append(criticalCount)
+        .append(unknownCount)
+        .append(maintenanceCount)
+        .toHashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
index 9d1c992..059becd 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
@@ -114,8 +114,8 @@ public abstract class AbstractServerAction implements 
ServerAction {
         report = new CommandReport();
 
         
report.setActionId(StageUtils.getActionId(hostRoleCommand.getRequestId(), 
hostRoleCommand.getStageId()));
-        report.setClusterName(executionCommand.getClusterName());
-        report.setConfigurationTags(executionCommand.getConfigurationTags());
+        report.setClusterId(executionCommand.getClusterId());
+        //report.setConfigurationTags(executionCommand.getConfigurationTags());
         report.setRole(executionCommand.getRole());
         report.setRoleCommand((roleCommand == null) ? null : 
roleCommand.toString());
         report.setServiceName(executionCommand.getServiceName());

Reply via email to