http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java new file mode 100644 index 0000000..362c117 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostStatusUpdateEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +public class HostStatusUpdateEvent extends AmbariEvent { + + private String hostName; + private String hostStatus; + + public HostStatusUpdateEvent(String hostName, String hostStatus) { + super(AmbariEventType.HOST_STATUS_CHANGE); + this.hostName = hostName; + this.hostStatus = hostStatus; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getHostStatus() { + return hostStatus; + } + + public void setHostStatus(String hostStatus) { + this.hostStatus = hostStatus; + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java new file mode 100644 index 0000000..a7f9fa6 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/HostUpdateEvent.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +import org.apache.ambari.server.orm.dao.AlertSummaryDTO; +import org.apache.ambari.server.state.HostState; +import org.apache.ambari.server.state.MaintenanceState; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Host info with updated parameter. This update will be sent to all subscribed recipients. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class HostUpdateEvent extends AmbariUpdateEvent { + + @JsonProperty("cluster_name") + private String clusterName; + + @JsonProperty("host_name") + private String hostName; + + @JsonProperty("host_status") + private String hostStatus; + + @JsonProperty("host_state") + private HostState hostState; + + @JsonProperty("last_heartbeat_time") + private Long lastHeartbeatTime; + + @JsonProperty("maintenance_state") + private MaintenanceState maintenanceState; + + @JsonProperty("alerts_summary") + private AlertSummaryDTO alertsSummary; + + public HostUpdateEvent(String clusterName, String hostName, String hostStatus, HostState hostState, + Long lastHeartbeatTime, MaintenanceState maintenanceState, AlertSummaryDTO alertsSummary) { + super(Type.HOST); + this.clusterName = clusterName; + this.hostName = hostName; + this.hostStatus = hostStatus; + this.hostState = hostState; + this.lastHeartbeatTime = lastHeartbeatTime; + this.maintenanceState = maintenanceState; + this.alertsSummary = alertsSummary; + } + + public static HostUpdateEvent createHostStatusUpdate(String clusterName, String hostName, String hostStatus, + Long lastHeartbeatTime) { + return new HostUpdateEvent(clusterName, hostName, hostStatus, null, lastHeartbeatTime, null, null); + } + + public static HostUpdateEvent createHostStateUpdate(String clusterName, String hostName, HostState hostState, + Long lastHeartbeatTime) { + return new HostUpdateEvent(clusterName, hostName, null, hostState, lastHeartbeatTime, null, null); + } + + public static HostUpdateEvent createHostMaintenanceStatusUpdate(String clusterName, String hostName, + MaintenanceState maintenanceState, + AlertSummaryDTO alertsSummary) { + return new HostUpdateEvent(clusterName, hostName, null, null, null, maintenanceState, alertsSummary); + } + + public static HostUpdateEvent createHostAlertsUpdate(String clusterName, String hostName, + AlertSummaryDTO alertsSummary) { + return new HostUpdateEvent(clusterName, hostName, null, null, null, null, alertsSummary); + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getHostStatus() { + return hostStatus; + } + + public void setHostStatus(String hostStatus) { + this.hostStatus = hostStatus; + } + + public Long getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + public void setLastHeartbeatTime(Long lastHeartbeatTime) { + this.lastHeartbeatTime = lastHeartbeatTime; + } + + public MaintenanceState getMaintenanceState() { + return maintenanceState; + } + + public void setMaintenanceState(MaintenanceState maintenanceState) { + this.maintenanceState = maintenanceState; + } + + public AlertSummaryDTO getAlertsSummary() { + return alertsSummary; + } + + public void setAlertsSummary(AlertSummaryDTO alertsSummary) { + this.alertsSummary = alertsSummary; + } + + public HostState getHostState() { + return hostState; + } + + public void setHostState(HostState hostState) { + this.hostState = hostState; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HostUpdateEvent that = (HostUpdateEvent) o; + + if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false; + if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false; + if (hostStatus != null ? !hostStatus.equals(that.hostStatus) : that.hostStatus != null) return false; + if (hostState != that.hostState) return false; + if (lastHeartbeatTime != null ? !lastHeartbeatTime.equals(that.lastHeartbeatTime) : that.lastHeartbeatTime != null) + return false; + if (maintenanceState != that.maintenanceState) return false; + return alertsSummary != null ? alertsSummary.equals(that.alertsSummary) : that.alertsSummary == null; + } + + @Override + public int hashCode() { + int result = clusterName != null ? clusterName.hashCode() : 0; + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + result = 31 * result + (hostStatus != null ? hostStatus.hashCode() : 0); + result = 31 * result + (hostState != null ? hostState.hashCode() : 0); + result = 31 * result + (lastHeartbeatTime != null ? lastHeartbeatTime.hashCode() : 0); + result = 31 * result + (maintenanceState != null ? maintenanceState.hashCode() : 0); + result = 31 * result + (alertsSummary != null ? alertsSummary.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java index 6e7ee90..239b5b8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MetadataUpdateEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,17 +26,37 @@ import org.apache.ambari.server.agent.stomp.dto.MetadataCluster; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +/** + * Contains update info about metadata for all clusters. This update will be sent to all subscribed recipients. + */ @JsonInclude(JsonInclude.Include.NON_NULL) public class MetadataUpdateEvent extends AmbariUpdateEvent implements Hashable { + /** + * Id used to send parameters common to all clusters. + */ + private final String AMBARI_LEVEL_CLUSTER_ID = "-1"; + + /** + * Actual version hash. + */ private String hash; + /** + * Map of metadatas for each cluster by cluster ids. + */ @JsonProperty("clusters") private TreeMap<String, MetadataCluster> metadataClusters = new TreeMap<>(); - public MetadataUpdateEvent(TreeMap<String, MetadataCluster> metadataClusters) { + public MetadataUpdateEvent(TreeMap<String, MetadataCluster> metadataClusters, TreeMap<String, String> ambariLevelParams) { super(Type.METADATA); this.metadataClusters = metadataClusters; + if (ambariLevelParams != null) { + if (this.metadataClusters == null) { + this.metadataClusters = new TreeMap<>(); + } + this.metadataClusters.put(AMBARI_LEVEL_CLUSTER_ID, new MetadataCluster(null, new TreeMap<>(), ambariLevelParams)); + } } public Map<String, MetadataCluster> getMetadataClusters() { @@ -57,6 +77,21 @@ public class MetadataUpdateEvent extends AmbariUpdateEvent implements Hashable { } public static MetadataUpdateEvent emptyUpdate() { - return new MetadataUpdateEvent(null); + return new MetadataUpdateEvent(null, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + MetadataUpdateEvent that = (MetadataUpdateEvent) o; + + return metadataClusters != null ? metadataClusters.equals(that.metadataClusters) : that.metadataClusters == null; + } + + @Override + public int hashCode() { + return metadataClusters != null ? metadataClusters.hashCode() : 0; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java index 3096347..53dc69f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedHostRoleCommandUpdateEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,9 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus; import com.fasterxml.jackson.annotation.JsonInclude; +/** + * Single host role command update info. This update will be sent to all subscribed recipients. + */ @JsonInclude(JsonInclude.Include.NON_NULL) public class NamedHostRoleCommandUpdateEvent extends AmbariUpdateEvent { @@ -124,4 +127,36 @@ public class NamedHostRoleCommandUpdateEvent extends AmbariUpdateEvent { public String getDestination() { return super.getDestination() + getId(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NamedHostRoleCommandUpdateEvent that = (NamedHostRoleCommandUpdateEvent) o; + + if (id != null ? !id.equals(that.id) : that.id != null) return false; + if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false; + if (hostName != null ? !hostName.equals(that.hostName) : that.hostName != null) return false; + if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false; + if (status != that.status) return false; + if (errorLog != null ? !errorLog.equals(that.errorLog) : that.errorLog != null) return false; + if (outLog != null ? !outLog.equals(that.outLog) : that.outLog != null) return false; + if (stderr != null ? !stderr.equals(that.stderr) : that.stderr != null) return false; + return stdout != null ? stdout.equals(that.stdout) : that.stdout == null; + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (requestId != null ? requestId.hashCode() : 0); + result = 31 * result + (hostName != null ? hostName.hashCode() : 0); + result = 31 * result + (endTime != null ? endTime.hashCode() : 0); + result = 31 * result + (status != null ? status.hashCode() : 0); + result = 31 * result + (errorLog != null ? errorLog.hashCode() : 0); + result = 31 * result + (outLog != null ? outLog.hashCode() : 0); + result = 31 * result + (stderr != null ? stderr.hashCode() : 0); + result = 31 * result + (stdout != null ? stdout.hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java index 2cf7b80..4133c62 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,8 +18,9 @@ package org.apache.ambari.server.events; -import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.controller.internal.CalculatedStatus; @@ -31,6 +32,9 @@ import org.apache.ambari.server.topology.TopologyManager; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +/** + * Contains info about request update. This update will be sent to all subscribed recipients. + */ @JsonInclude(JsonInclude.Include.NON_NULL) public class RequestUpdateEvent extends AmbariUpdateEvent { @@ -43,7 +47,7 @@ public class RequestUpdateEvent extends AmbariUpdateEvent { private Long startTime; @JsonProperty("Tasks") - private List<HostRoleCommand> hostRoleCommands = new ArrayList<>(); + private Set<HostRoleCommand> hostRoleCommands = new HashSet<>(); public RequestUpdateEvent(RequestEntity requestEntity, HostRoleCommandDAO hostRoleCommandDAO, @@ -67,6 +71,14 @@ public class RequestUpdateEvent extends AmbariUpdateEvent { } } + public RequestUpdateEvent(Long requestId, HostRoleStatus requestStatus, + Set<HostRoleCommand> hostRoleCommands) { + super(Type.REQUEST); + this.requestId = requestId; + this.requestStatus = requestStatus; + this.hostRoleCommands = hostRoleCommands; + } + public Long getRequestId() { return requestId; } @@ -123,7 +135,15 @@ public class RequestUpdateEvent extends AmbariUpdateEvent { this.startTime = startTime; } - public class HostRoleCommand { + public Set<HostRoleCommand> getHostRoleCommands() { + return hostRoleCommands; + } + + public void setHostRoleCommands(Set<HostRoleCommand> hostRoleCommands) { + this.hostRoleCommands = hostRoleCommands; + } + + public static class HostRoleCommand { private Long id; private Long requestId; private HostRoleStatus status; @@ -167,5 +187,57 @@ public class RequestUpdateEvent extends AmbariUpdateEvent { public void setHostName(String hostName) { this.hostName = hostName; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + HostRoleCommand that = (HostRoleCommand) o; + + if (!id.equals(that.id)) return false; + if (!requestId.equals(that.requestId)) return false; + return hostName.equals(that.hostName); + } + + @Override + public int hashCode() { + int result = id.hashCode(); + result = 31 * result + requestId.hashCode(); + result = 31 * result + hostName.hashCode(); + return result; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RequestUpdateEvent that = (RequestUpdateEvent) o; + + if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false; + if (endTime != null ? !endTime.equals(that.endTime) : that.endTime != null) return false; + if (requestId != null ? !requestId.equals(that.requestId) : that.requestId != null) return false; + if (progressPercent != null ? !progressPercent.equals(that.progressPercent) : that.progressPercent != null) + return false; + if (requestContext != null ? !requestContext.equals(that.requestContext) : that.requestContext != null) + return false; + if (requestStatus != that.requestStatus) return false; + if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false; + return hostRoleCommands != null ? hostRoleCommands.equals(that.hostRoleCommands) : that.hostRoleCommands == null; + } + + @Override + public int hashCode() { + int result = clusterName != null ? clusterName.hashCode() : 0; + result = 31 * result + (endTime != null ? endTime.hashCode() : 0); + result = 31 * result + (requestId != null ? requestId.hashCode() : 0); + result = 31 * result + (progressPercent != null ? progressPercent.hashCode() : 0); + result = 31 * result + (requestContext != null ? requestContext.hashCode() : 0); + result = 31 * result + (requestStatus != null ? requestStatus.hashCode() : 0); + result = 31 * result + (startTime != null ? startTime.hashCode() : 0); + result = 31 * result + (hostRoleCommands != null ? hostRoleCommands.hashCode() : 0); + return result; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java new file mode 100644 index 0000000..d39c4ae --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ServiceUpdateEvent.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.State; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Contains info about service update. This update will be sent to all subscribed recipients. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ServiceUpdateEvent extends AmbariUpdateEvent { + + @JsonProperty("cluster_name") + private String clusterName; + + @JsonProperty("maintenance_state") + private MaintenanceState maintenanceState; + + @JsonProperty("service_name") + private String serviceName; + + @JsonProperty("state") + private State state; + + public ServiceUpdateEvent(String clusterName, MaintenanceState maintenanceState, String serviceName, State state) { + super(Type.SERVICE); + this.clusterName = clusterName; + this.maintenanceState = maintenanceState; + this.serviceName = serviceName; + this.state = state; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public MaintenanceState getMaintenanceState() { + return maintenanceState; + } + + public void setMaintenanceState(MaintenanceState maintenanceState) { + this.maintenanceState = maintenanceState; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ServiceUpdateEvent that = (ServiceUpdateEvent) o; + + if (clusterName != null ? !clusterName.equals(that.clusterName) : that.clusterName != null) return false; + if (maintenanceState != that.maintenanceState) return false; + if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false; + return state == that.state; + } + + @Override + public int hashCode() { + int result = clusterName != null ? clusterName.hashCode() : 0; + result = 31 * result + (maintenanceState != null ? maintenanceState.hashCode() : 0); + result = 31 * result + (serviceName != null ? serviceName.hashCode() : 0); + result = 31 * result + (state != null ? state.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java new file mode 100644 index 0000000..1fa4e6c --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyAgentUpdateEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +import java.util.TreeMap; + +import org.apache.ambari.server.agent.stomp.dto.TopologyCluster; + +import com.fasterxml.jackson.annotation.JsonInclude; + +/** + * Contains info about clusters topology update. This update will be sent to all subscribed recipients. + * Is used to messaging to agents. + */ +@JsonInclude(JsonInclude.Include.NON_EMPTY) +public class TopologyAgentUpdateEvent extends TopologyUpdateEvent { + public TopologyAgentUpdateEvent(TreeMap<String, TopologyCluster> clusters, String hash, EventType eventType) { + super(Type.AGENT_TOPOLOGY, clusters, hash, eventType); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java index cfab422..1b5b90b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TopologyUpdateEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.events; +import java.util.Map; import java.util.TreeMap; import org.apache.ambari.server.agent.stomp.dto.Hashable; @@ -25,18 +26,38 @@ import org.apache.ambari.server.agent.stomp.dto.TopologyCluster; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +/** + * Contains info about clusters topology update. This update will be sent to all subscribed recipients. + * Is used to messaging to UI. + */ @JsonInclude(JsonInclude.Include.NON_NULL) public class TopologyUpdateEvent extends AmbariUpdateEvent implements Hashable { + + /** + * Map of clusters topologies by cluster ids. + */ @JsonProperty("clusters") private TreeMap<String, TopologyCluster> clusters; + /** + * Actual version hash. + */ private String hash; + /** + * Type of update, is used to differ full current topology (CREATE), adding new or update existing topology + * elements (UPDATE) and removing existing topology elements (DELETE). + */ private EventType eventType; public TopologyUpdateEvent(TreeMap<String, TopologyCluster> clusters, EventType eventType) { - super(Type.TOPOLOGY); + this(Type.UI_TOPOLOGY, clusters, null, eventType); + } + + public TopologyUpdateEvent(Type type, TreeMap<String, TopologyCluster> clusters, String hash, EventType eventType) { + super(type); this.clusters = clusters; + this.hash = hash; this.eventType = eventType; } @@ -44,6 +65,16 @@ public class TopologyUpdateEvent extends AmbariUpdateEvent implements Hashable { return clusters; } + public TopologyUpdateEvent deepCopy() { + TreeMap<String, TopologyCluster> copiedClusters = new TreeMap<>(); + for (Map.Entry<String, TopologyCluster> topologyClusterEntry : getClusters().entrySet()) { + copiedClusters.put(topologyClusterEntry.getKey(), topologyClusterEntry.getValue().deepCopyCluster()); + } + TopologyUpdateEvent copiedEvent = new TopologyUpdateEvent(copiedClusters, getEventType()); + copiedEvent.setHash(getHash()); + return copiedEvent; + } + public void setClusters(TreeMap<String, TopologyCluster> clusters) { this.clusters = clusters; } @@ -73,4 +104,22 @@ public class TopologyUpdateEvent extends AmbariUpdateEvent implements Hashable { DELETE, UPDATE } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopologyUpdateEvent that = (TopologyUpdateEvent) o; + + if (clusters != null ? !clusters.equals(that.clusters) : that.clusters != null) return false; + return eventType == that.eventType; + } + + @Override + public int hashCode() { + int result = clusters != null ? clusters.hashCode() : 0; + result = 31 * result + (eventType != null ? eventType.hashCode() : 0); + return result; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java index c34b95a..23a547f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java @@ -151,8 +151,7 @@ public class AlertReceivedListener { List<AlertCurrentEntity> toCreateHistoryAndMerge = new ArrayList<>(); List<AlertEvent> alertEvents = new ArrayList<>(20); - List<Alert> updatedAlerts = new ArrayList<>(); - Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary> summaries = new HashMap<>(); + Map<Long, Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary>> alertUpdates = new HashMap<>(); for (Alert alert : alerts) { // jobs that were running when a service/component/host was changed @@ -343,10 +342,15 @@ public class AlertReceivedListener { // create the event to fire later alertEvents.add(new AlertStateChangeEvent(clusterId, alert, current, oldState, oldFirmness)); - updatedAlerts.add(alert); // create alert update to fire event to UI MaintenanceState maintenanceState = getMaintenanceState(alert, clusterId); + + if (!alertUpdates.containsKey(clusterId)) { + alertUpdates.put(clusterId, new HashMap<>()); + } + Map<String, AlertSummaryGroupedRenderer.AlertDefinitionSummary> summaries = alertUpdates.get(clusterId); + AlertSummaryGroupedRenderer.updateSummary(summaries, definition.getDefinitionId(), definition.getDefinitionName(), alertState, alert.getTimestamp(), maintenanceState, alert.getText()); } @@ -360,8 +364,8 @@ public class AlertReceivedListener { for (AlertEvent eventToFire : alertEvents) { m_alertEventPublisher.publish(eventToFire); } - if (!summaries.isEmpty()) { - stateUpdateEventPublisher.publish(new AlertUpdateEvent(summaries)); + if (!alertUpdates.isEmpty()) { + stateUpdateEventPublisher.publish(new AlertUpdateEvent(alertUpdates)); } } @@ -612,7 +616,7 @@ public class AlertReceivedListener { * the definition to read any repeat tolerance overrides from. * @param state * the state of the {@link AlertCurrentEntity}. - * @param occurrences + * @param the * occurrences of the alert in the current state (used for * calculation firmness when moving between non-OK states) * @return http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java new file mode 100644 index 0000000..6cb831f --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hosts/HostUpdateListener.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events.listeners.hosts; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.EagerSingleton; +import org.apache.ambari.server.events.AlertEvent; +import org.apache.ambari.server.events.AlertStateChangeEvent; +import org.apache.ambari.server.events.HostStateUpdateEvent; +import org.apache.ambari.server.events.HostStatusUpdateEvent; +import org.apache.ambari.server.events.HostUpdateEvent; +import org.apache.ambari.server.events.InitialAlertEvent; +import org.apache.ambari.server.events.MaintenanceModeEvent; +import org.apache.ambari.server.events.publishers.AlertEventPublisher; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; +import org.apache.ambari.server.orm.dao.AlertSummaryDTO; +import org.apache.ambari.server.orm.dao.AlertsDAO; +import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.MaintenanceState; +import org.apache.commons.lang.StringUtils; + +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +@Singleton +@EagerSingleton +public class HostUpdateListener { + + private Map<Long, Map<String, HostUpdateEvent>> hosts = new HashMap<>(); + + @Inject + private StateUpdateEventPublisher stateUpdateEventPublisher; + + @Inject + private ServiceDesiredStateDAO serviceDesiredStateDAO; + + @Inject + private AlertsDAO alertsDAO; + + @Inject + private Provider<Clusters> m_clusters; + + @Inject + public HostUpdateListener(AmbariEventPublisher ambariEventPublisher, AlertEventPublisher m_alertEventPublisher) { + ambariEventPublisher.register(this); + m_alertEventPublisher.register(this); + } + + @Subscribe + public void onHostStatusUpdate(HostStatusUpdateEvent event) throws AmbariException { + String hostName = event.getHostName(); + Long lastHeartbeatTime = m_clusters.get().getHost(hostName).getLastHeartbeatTime(); + + for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) { + Long clusterId = cluster.getClusterId(); + + // retrieve state from cache + HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, hostName); + + if (hostUpdateEvent.getHostStatus().equals(event.getHostStatus())) { + continue; + } else { + hostUpdateEvent.setHostStatus(event.getHostStatus()); + } + hostUpdateEvent.setLastHeartbeatTime(lastHeartbeatTime); + + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostStatusUpdate(hostUpdateEvent.getClusterName(), + hostUpdateEvent.getHostName(), + hostUpdateEvent.getHostStatus(), + hostUpdateEvent.getLastHeartbeatTime())); + } + } + + @Subscribe + public void onHostStateUpdate(HostStateUpdateEvent event) throws AmbariException { + String hostName = event.getHostName(); + Long lastHeartbeatTime = m_clusters.get().getHost(hostName).getLastHeartbeatTime(); + + for (Cluster cluster : m_clusters.get().getClustersForHost(hostName)) { + Long clusterId = cluster.getClusterId(); + + // retrieve state from cache + HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, hostName); + + if (hostUpdateEvent.getHostState().equals(event.getHostState())) { + continue; + } else { + hostUpdateEvent.setHostState(event.getHostState()); + } + hostUpdateEvent.setLastHeartbeatTime(lastHeartbeatTime); + + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostStateUpdate(hostUpdateEvent.getClusterName(), + hostUpdateEvent.getHostName(), + hostUpdateEvent.getHostState(), + hostUpdateEvent.getLastHeartbeatTime())); + } + } + + @Subscribe + public void onAlertsHostUpdate(AlertEvent event) throws AmbariException { + String hostName; + if (!(event instanceof AlertStateChangeEvent) && !(event instanceof InitialAlertEvent)) { + return; + } else if (event instanceof AlertStateChangeEvent) { + hostName = ((AlertStateChangeEvent) event).getNewHistoricalEntry().getHostName(); + } else { + hostName = ((InitialAlertEvent) event).getNewHistoricalEntry().getHostName(); + } + if (StringUtils.isEmpty(hostName)) { + return; + } + + Long clusterId = event.getClusterId(); + + // retrieve state from cache + HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, hostName); + + // change alerts counters + AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, hostName); + if (hostUpdateEvent.getAlertsSummary().equals(summary)) { + return; + } + hostUpdateEvent.setAlertsSummary(summary); + + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(), + hostName, summary)); + } + + @Subscribe + public void onMaintenanceStateUpdate(MaintenanceModeEvent event) throws AmbariException { + Long clusterId = event.getClusterId(); + + if (event.getHost() != null || event.getServiceComponentHost() != null) { + String hostName = event.getHost().getHostName(); + + HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, hostName); + + AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, hostName); + + if (hostUpdateEvent.getAlertsSummary().equals(summary)) { + return; + } + hostUpdateEvent.setAlertsSummary(summary); + if (event.getHost() != null) { + MaintenanceState maintenanceState = event.getMaintenanceState(); + hostUpdateEvent.setMaintenanceState(maintenanceState); + + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostMaintenanceStatusUpdate(hostUpdateEvent.getClusterName(), + hostName, maintenanceState, summary)); + } else { + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(), + hostName, summary)); + } + } else if (event.getService()!= null) { + String serviceName = event.getService().getName(); + for (String hostName : m_clusters.get().getCluster(clusterId).getService(serviceName).getServiceHosts()) { + HostUpdateEvent hostUpdateEvent = retrieveHostUpdateFromCache(clusterId, hostName); + + AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, hostName); + + if (hostUpdateEvent.getAlertsSummary().equals(summary)) { + continue; + } + hostUpdateEvent.setAlertsSummary(summary); + + stateUpdateEventPublisher.publish(HostUpdateEvent.createHostAlertsUpdate(hostUpdateEvent.getClusterName(), + hostName, summary)); + } + } + } + + private HostUpdateEvent retrieveHostUpdateFromCache(Long clusterId, String hostName) throws AmbariException { + HostUpdateEvent hostUpdateEvent; + if (hosts.containsKey(clusterId) && hosts.get(clusterId).containsKey(hostName)) { + hostUpdateEvent = hosts.get(clusterId).get(hostName); + } else { + hostUpdateEvent = createHostUpdateEvent(clusterId, hostName); + if (!hosts.containsKey(clusterId)) { + hosts.put(clusterId, new HashMap<>()); + } + hosts.get(clusterId).put(hostName, hostUpdateEvent); + } + + return hostUpdateEvent; + } + + private HostUpdateEvent createHostUpdateEvent(Long clusterId, String hostName) throws AmbariException { + String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName(); + Host host = m_clusters.get().getHost(hostName); + + AlertSummaryDTO summary = alertsDAO.findCurrentCounts(clusterId, null, hostName); + + return new HostUpdateEvent(clusterName, hostName, host.getStatus(), host.getState(), host.getLastHeartbeatTime(), + host.getMaintenanceState(clusterId), summary); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java index 940dc76..27af717 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/StateUpdateListener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,11 +19,17 @@ package org.apache.ambari.server.events.listeners.requests; +import org.apache.ambari.server.HostNotRegisteredException; +import org.apache.ambari.server.agent.AgentSessionManager; +import org.apache.ambari.server.events.AmbariHostUpdateEvent; import org.apache.ambari.server.events.AmbariUpdateEvent; import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.simp.SimpMessageHeaderAccessor; +import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.SimpMessagingTemplate; import com.google.common.eventbus.AllowConcurrentEvents; @@ -34,19 +40,38 @@ public class StateUpdateListener { private final static Logger LOG = LoggerFactory.getLogger(StateUpdateListener.class); + private final AgentSessionManager agentSessionManager; + @Autowired SimpMessagingTemplate simpMessagingTemplate; public StateUpdateListener(Injector injector) { StateUpdateEventPublisher stateUpdateEventPublisher = injector.getInstance(StateUpdateEventPublisher.class); + agentSessionManager = injector.getInstance(AgentSessionManager.class); stateUpdateEventPublisher.register(this); } @Subscribe @AllowConcurrentEvents - public void onUpdateEvent(AmbariUpdateEvent event) { - LOG.debug("Received status update event {}", event.toString()); - simpMessagingTemplate.convertAndSend(event.getDestination(), event); + public void onUpdateEvent(AmbariUpdateEvent event) throws HostNotRegisteredException { + if (event instanceof AmbariHostUpdateEvent) { + AmbariHostUpdateEvent ambariHostUpdateEvent = (AmbariHostUpdateEvent) event; + String sessionId = agentSessionManager.getSessionId(ambariHostUpdateEvent.getHostName()); + LOG.debug("Received status update event {} for host ()", ambariHostUpdateEvent.toString(), + ambariHostUpdateEvent.getHostName()); + simpMessagingTemplate.convertAndSendToUser(sessionId, ambariHostUpdateEvent.getDestination(), + ambariHostUpdateEvent, createHeaders(sessionId)); + } else { + LOG.debug("Received status update event {}", event.toString()); + simpMessagingTemplate.convertAndSend(event.getDestination(), event); + } + } + + private MessageHeaders createHeaders(String sessionId) { + SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); + headerAccessor.setSessionId(sessionId); + headerAccessor.setLeaveMutable(true); + return headerAccessor.getMessageHeaders(); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java new file mode 100644 index 0000000..24c8166 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events.listeners.services; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.EagerSingleton; +import org.apache.ambari.server.controller.utilities.ServiceCalculatedStateFactory; +import org.apache.ambari.server.controller.utilities.state.ServiceCalculatedState; +import org.apache.ambari.server.events.HostComponentUpdate; +import org.apache.ambari.server.events.HostComponentsUpdateEvent; +import org.apache.ambari.server.events.MaintenanceModeEvent; +import org.apache.ambari.server.events.ServiceUpdateEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; +import org.apache.ambari.server.orm.dao.ServiceDesiredStateDAO; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.MaintenanceState; +import org.apache.ambari.server.state.State; + +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +@Singleton +@EagerSingleton +public class ServiceUpdateListener { + private Map<Long, Map<String, State>> states = new HashMap<>(); + + private StateUpdateEventPublisher stateUpdateEventPublisher; + + @Inject + private ServiceDesiredStateDAO serviceDesiredStateDAO; + + @Inject + private Provider<Clusters> m_clusters; + + @Inject + public ServiceUpdateListener(StateUpdateEventPublisher stateUpdateEventPublisher, AmbariEventPublisher ambariEventPublisher) { + stateUpdateEventPublisher.register(this); + ambariEventPublisher.register(this); + + this.stateUpdateEventPublisher = stateUpdateEventPublisher; + } + + @Subscribe + public void onHostComponentUpdate(HostComponentsUpdateEvent event) throws AmbariException { + for (HostComponentUpdate hostComponentUpdate : event.getHostComponentUpdates()) { + Long clusterId = hostComponentUpdate.getClusterId(); + String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName(); + String serviceName = hostComponentUpdate.getServiceName(); + + ServiceCalculatedState serviceCalculatedState = ServiceCalculatedStateFactory.getServiceStateProvider(serviceName); + State serviceState = serviceCalculatedState.getState(clusterName, serviceName); + + // retrieve state from cache + if (states.containsKey(clusterId) && states.get(clusterId).containsKey(serviceName) && states.get(clusterId).get(serviceName).equals(serviceState)) { + continue; + } + if (!states.containsKey(clusterId)) { + states.put(clusterId, new HashMap<>()); + } + states.get(clusterId).put(serviceName, serviceState); + stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, null, serviceName, serviceState)); + } + } + + @Subscribe + public void onMaintenanceStateUpdate(MaintenanceModeEvent event) throws AmbariException { + if (event.getService() == null) { + return; + } + Long clusterId = event.getClusterId(); + String clusterName = m_clusters.get().getClusterById(clusterId).getClusterName(); + String serviceName = event.getService().getName(); + + MaintenanceState maintenanceState = event.getMaintenanceState(); + + stateUpdateEventPublisher.publish(new ServiceUpdateEvent(clusterName, maintenanceState, serviceName, null)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java index 6bb3b69..c40969e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -147,6 +147,8 @@ public class TaskStatusListener { List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>(); Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>(); Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>(); + Set<RequestUpdateEvent> requestsToPublish = new HashSet<>(); + for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) { Long reportedTaskId = hostRoleCommand.getTaskId(); HostRoleCommand activeTask = activeTasksMap.get(reportedTaskId); @@ -159,6 +161,16 @@ public class TaskStatusListener { stageEntityPK.setStageId(hostRoleCommand.getStageId()); stagesWithReceivedTaskStatus.add(stageEntityPK); requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId()); + + if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) { + Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new HashSet<>(); + hostRoleCommands.add(new RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(), + hostRoleCommand.getRequestId(), + hostRoleCommand.getStatus(), + hostRoleCommand.getHostName())); + requestsToPublish.add(new RequestUpdateEvent(hostRoleCommand.getRequestId(), + activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(), hostRoleCommands)); + } } } @@ -182,7 +194,9 @@ public class TaskStatusListener { if (didAnyStageStatusUpdated) { updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus); } - + for (RequestUpdateEvent requestToPublish : requestsToPublish) { + stateUpdateEventPublisher.publish(requestToPublish); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java index 22d7f2e..a35957d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java @@ -209,7 +209,7 @@ public class StackVersionListener { */ private void processComponentVersionChange(Cluster cluster, ServiceComponent sc, ServiceComponentHost sch, - String newVersion) { + String newVersion) throws AmbariException { String desiredVersion = sc.getDesiredVersion(); UpgradeState upgradeState = sch.getUpgradeState(); if (upgradeState == UpgradeState.IN_PROGRESS) { http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java new file mode 100644 index 0000000..e09d5ca --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AgentCommandsPublisher.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.agent.AgentCommand; +import org.apache.ambari.server.agent.CancelCommand; +import org.apache.ambari.server.agent.ExecutionCommand; +import org.apache.ambari.server.agent.stomp.dto.ExecutionCommandsCluster; +import org.apache.ambari.server.events.ExecutionCommandEvent; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; +import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReader; +import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReaderFactory; +import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.utils.StageUtils; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +public class AgentCommandsPublisher { + private static final Logger LOG = LoggerFactory.getLogger(AgentCommandsPublisher.class); + + /** + * KerberosIdentityDataFileReaderFactory used to create KerberosIdentityDataFileReader instances + */ + @Inject + private KerberosIdentityDataFileReaderFactory kerberosIdentityDataFileReaderFactory; + + @Inject + private Clusters clusters; + + @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private StateUpdateEventPublisher stateUpdateEventPublisher; + + public void sendAgentCommand(Multimap<String, AgentCommand> agentCommands) throws AmbariException { + if (agentCommands != null && !agentCommands.isEmpty()) { + Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters = new TreeMap<>(); + for (Map.Entry<String, AgentCommand> acHostEntry : agentCommands.entries()) { + String hostName = acHostEntry.getKey(); + AgentCommand ac = acHostEntry.getValue(); + populateExecutionCommandsClusters(executionCommandsClusters, hostName, ac); + } + for (Map.Entry<String, TreeMap<String, ExecutionCommandsCluster>> hostEntry : executionCommandsClusters.entrySet()) { + String hostName = hostEntry.getKey(); + ExecutionCommandEvent executionCommandEvent = new ExecutionCommandEvent(hostEntry.getValue()); + executionCommandEvent.setHostName(hostName); + stateUpdateEventPublisher.publish(executionCommandEvent); + } + } + } + + public void sendAgentCommand(String hostName, AgentCommand agentCommand) throws AmbariException { + Multimap<String, AgentCommand> agentCommands = ArrayListMultimap.create(); + agentCommands.put(hostName, agentCommand); + sendAgentCommand(agentCommands); + } + + private void populateExecutionCommandsClusters(Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters, + String hostName, AgentCommand ac) throws AmbariException { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending command string = " + StageUtils.jaxbToString(ac)); + } + } catch (Exception e) { + throw new AmbariException("Could not get jaxb string for command", e); + } + switch (ac.getCommandType()) { + case BACKGROUND_EXECUTION_COMMAND: + case EXECUTION_COMMAND: { + ExecutionCommand ec = (ExecutionCommand) ac; + LOG.info("AgentCommandsPublisher.sendCommands: sending ExecutionCommand for host {}, role {}, roleCommand {}, and command ID {}, task ID {}", + ec.getHostname(), ec.getRole(), ec.getRoleCommand(), ec.getCommandId(), ec.getTaskId()); + Map<String, String> hlp = ec.getCommandParams(); + if (hlp != null) { + String customCommand = hlp.get("custom_command"); + if ("SET_KEYTAB".equalsIgnoreCase(customCommand) || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) { + LOG.info(String.format("%s called", customCommand)); + try { + injectKeytab(ec, customCommand, hostName); + } catch (IOException e) { + throw new AmbariException("Could not inject keytab into command", e); + } + } + } + String clusterName = ec.getClusterName(); + String clusterId = "-1"; + if (clusterName != null) { + clusterId = Long.toString(clusters.getCluster(clusterName).getClusterId()); + } + ec.setClusterId(clusterId); + prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId); + executionCommandsClusters.get(hostName).get(clusterId).getExecutionCommands().add((ExecutionCommand) ac); + break; + } + case CANCEL_COMMAND: { + CancelCommand cc = (CancelCommand) ac; + String clusterId = Long.toString(hostRoleCommandDAO.findByPK(cc.getTargetTaskId()).getStage().getClusterId()); + prepareExecutionCommandsClusters(executionCommandsClusters, hostName, clusterId); + executionCommandsClusters.get(hostName).get(clusterId).getCancelCommands().add(cc); + break; + } + default: + LOG.error("There is no action for agent command =" + + ac.getCommandType().name()); + } + } + + private void prepareExecutionCommandsClusters(Map<String, TreeMap<String, ExecutionCommandsCluster>> executionCommandsClusters, + String hostName, String clusterId) { + if (!executionCommandsClusters.containsKey(hostName)) { + executionCommandsClusters.put(hostName, new TreeMap<>()); + } + if (!executionCommandsClusters.get(hostName).containsKey(clusterId)) { + executionCommandsClusters.get(hostName).put(clusterId, new ExecutionCommandsCluster(new ArrayList<>(), + new ArrayList<>())); + } + } + + /** + * Insert Kerberos keytab details into the ExecutionCommand for the SET_KEYTAB custom command if + * any keytab details and associated data exists for the target host. + * + * @param ec the ExecutionCommand to update + * @param command a name of the relevant keytab command + * @param targetHost a name of the host the relevant command is destined for + * @throws AmbariException + */ + void injectKeytab(ExecutionCommand ec, String command, String targetHost) throws AmbariException { + String dataDir = ec.getCommandParams().get(KerberosServerAction.DATA_DIRECTORY); + + if (dataDir != null) { + KerberosIdentityDataFileReader reader = null; + List<Map<String, String>> kcp = ec.getKerberosCommandParams(); + + try { + reader = kerberosIdentityDataFileReaderFactory.createKerberosIdentityDataFileReader(new File(dataDir, KerberosIdentityDataFileReader.DATA_FILE_NAME)); + + for (Map<String, String> record : reader) { + String hostName = record.get(KerberosIdentityDataFileReader.HOSTNAME); + + if (targetHost.equalsIgnoreCase(hostName)) { + + if ("SET_KEYTAB".equalsIgnoreCase(command)) { + String keytabFilePath = record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH); + + if (keytabFilePath != null) { + + String sha1Keytab = DigestUtils.sha1Hex(keytabFilePath); + File keytabFile = new File(dataDir + File.separator + hostName + File.separator + sha1Keytab); + + if (keytabFile.canRead()) { + Map<String, String> keytabMap = new HashMap<>(); + String principal = record.get(KerberosIdentityDataFileReader.PRINCIPAL); + String isService = record.get(KerberosIdentityDataFileReader.SERVICE); + + keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); + keytabMap.put(KerberosIdentityDataFileReader.SERVICE, isService); + keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); + keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, principal); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, keytabFilePath); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_NAME)); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_OWNER_ACCESS)); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_NAME)); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_GROUP_ACCESS)); + + BufferedInputStream bufferedIn = new BufferedInputStream(new FileInputStream(keytabFile)); + byte[] keytabContent = null; + try { + keytabContent = IOUtils.toByteArray(bufferedIn); + } finally { + bufferedIn.close(); + } + String keytabContentBase64 = Base64.encodeBase64String(keytabContent); + keytabMap.put(KerberosServerAction.KEYTAB_CONTENT_BASE64, keytabContentBase64); + + kcp.add(keytabMap); + } + } + } else if ("REMOVE_KEYTAB".equalsIgnoreCase(command)) { + Map<String, String> keytabMap = new HashMap<>(); + + keytabMap.put(KerberosIdentityDataFileReader.HOSTNAME, hostName); + keytabMap.put(KerberosIdentityDataFileReader.SERVICE, record.get(KerberosIdentityDataFileReader.SERVICE)); + keytabMap.put(KerberosIdentityDataFileReader.COMPONENT, record.get(KerberosIdentityDataFileReader.COMPONENT)); + keytabMap.put(KerberosIdentityDataFileReader.PRINCIPAL, record.get(KerberosIdentityDataFileReader.PRINCIPAL)); + keytabMap.put(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH, record.get(KerberosIdentityDataFileReader.KEYTAB_FILE_PATH)); + + kcp.add(keytabMap); + } + } + } + } catch (IOException e) { + throw new AmbariException("Could not inject keytabs to enable kerberos"); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (Throwable t) { + // ignored + } + } + } + + ec.setKerberosCommandParams(kcp); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java new file mode 100644 index 0000000..e32e715 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/HostComponentUpdateEventPublisher.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.ambari.server.events.HostComponentUpdate; +import org.apache.ambari.server.events.HostComponentsUpdateEvent; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Singleton; + +@Singleton +public class HostComponentUpdateEventPublisher { + + private final Long TIMEOUT = 1000L; + private AtomicLong previousTime = new AtomicLong(0); + private AtomicBoolean collecting = new AtomicBoolean(false); + private ConcurrentLinkedQueue<HostComponentUpdate> buffer = new ConcurrentLinkedQueue<>(); + private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + + public void publish(HostComponentsUpdateEvent event, EventBus m_eventBus) { + Long eventTime = System.currentTimeMillis(); + if (eventTime - previousTime.get() <= TIMEOUT && !collecting.get()) { + buffer.addAll(event.getHostComponentUpdates()); + collecting.set(true); + scheduledExecutorService.schedule(new HostComponentsEventRunnable(m_eventBus), + TIMEOUT, TimeUnit.MILLISECONDS); + } else if (collecting.get()) { + buffer.addAll(event.getHostComponentUpdates()); + } else { + //TODO add logging and metrics posting + previousTime.set(eventTime); + m_eventBus.post(event); + } + } + + private class HostComponentsEventRunnable implements Runnable { + + private final EventBus eventBus; + + public HostComponentsEventRunnable(EventBus eventBus) { + this.eventBus = eventBus; + } + + @Override + public void run() { + List<HostComponentUpdate> hostComponentUpdates = new ArrayList<>(); + while (!buffer.isEmpty()) { + hostComponentUpdates.add(buffer.poll()); + } + HostComponentsUpdateEvent resultEvents = new HostComponentsUpdateEvent(hostComponentUpdates); + //TODO add logging and metrics posting + eventBus.post(resultEvents); + previousTime.set(System.currentTimeMillis()); + collecting.set(false); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java new file mode 100644 index 0000000..3ddf00c --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/RequestUpdateEventPublisher.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.server.controller.internal.CalculatedStatus; +import org.apache.ambari.server.events.RequestUpdateEvent; +import org.apache.ambari.server.orm.dao.ClusterDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; +import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.topology.TopologyManager; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +public class RequestUpdateEventPublisher { + + private final Long TIMEOUT = 1000L; + private ConcurrentHashMap<Long, Long> previousTime = new ConcurrentHashMap<>(); + private ConcurrentHashMap<Long, RequestUpdateEvent> buffer = new ConcurrentHashMap<>(); + + @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private TopologyManager topologyManager; + + @Inject + private RequestDAO requestDAO; + + @Inject + private ClusterDAO clusterDAO; + + public void publish(RequestUpdateEvent event, EventBus m_eventBus) { + Long eventTime = System.currentTimeMillis(); + Long requestId = event.getRequestId(); + if (!previousTime.containsKey(requestId)) { + previousTime.put(requestId, 0L); + } + if (eventTime - previousTime.get(requestId) <= TIMEOUT && !buffer.containsKey(requestId)) { + buffer.put(event.getRequestId(), event); + Executors.newScheduledThreadPool(1).schedule(new RequestEventRunnable(requestId, m_eventBus), + TIMEOUT, TimeUnit.MILLISECONDS); + } else if (buffer.containsKey(requestId)) { + //merge available buffer content with arrived + buffer.get(requestId).setEndTime(event.getEndTime()); + buffer.get(requestId).setRequestStatus(event.getRequestStatus()); + buffer.get(requestId).setRequestContext(event.getRequestContext()); + buffer.get(requestId).getHostRoleCommands().removeAll(event.getHostRoleCommands()); + buffer.get(requestId).getHostRoleCommands().addAll(event.getHostRoleCommands()); + } else { + previousTime.put(requestId, eventTime); + //TODO add logging and metrics posting + m_eventBus.post(fillRequest(event)); + } + } + + private RequestUpdateEvent fillRequest(RequestUpdateEvent event) { + event.setProgressPercent( + CalculatedStatus.statusFromRequest(hostRoleCommandDAO, topologyManager, event.getRequestId()).getPercent()); + if (event.getEndTime() == null || event.getStartTime() == null || event.getClusterName() == null + || event.getRequestContext() == null) { + RequestEntity requestEntity = requestDAO.findByPK(event.getRequestId()); + event.setStartTime(requestEntity.getStartTime()); + event.setEndTime(requestEntity.getEndTime()); + if (requestEntity.getClusterId() != -1) { + event.setClusterName(clusterDAO.findById(requestEntity.getClusterId()).getClusterName()); + } + event.setRequestContext(requestEntity.getRequestContext()); + event.setRequestStatus(requestEntity.getStatus()); + } + return event; + } + + private class RequestEventRunnable implements Runnable { + + private final long requestId; + private final EventBus eventBus; + + public RequestEventRunnable(long requestId, EventBus eventBus) { + this.requestId = requestId; + this.eventBus = eventBus; + } + + @Override + public void run() { + RequestUpdateEvent resultEvent = buffer.get(requestId); + //TODO add logging and metrics posting + eventBus.post(fillRequest(resultEvent)); + buffer.remove(requestId); + previousTime.remove(requestId); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java index 62fe44c..53738f4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/StateUpdateEventPublisher.java @@ -20,9 +20,12 @@ package org.apache.ambari.server.events.publishers; import java.util.concurrent.Executors; import org.apache.ambari.server.events.AmbariUpdateEvent; +import org.apache.ambari.server.events.HostComponentsUpdateEvent; +import org.apache.ambari.server.events.RequestUpdateEvent; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; +import com.google.inject.Inject; import com.google.inject.Singleton; @Singleton @@ -30,13 +33,25 @@ public final class StateUpdateEventPublisher { private final EventBus m_eventBus; + @Inject + private RequestUpdateEventPublisher requestUpdateEventPublisher; + + @Inject + private HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher; + public StateUpdateEventPublisher() { m_eventBus = new AsyncEventBus("ambari-update-bus", Executors.newSingleThreadExecutor()); } public void publish(AmbariUpdateEvent event) { - m_eventBus.post(event); + if (event.getType().equals(AmbariUpdateEvent.Type.REQUEST)) { + requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus); + } else if (event.getType().equals(AmbariUpdateEvent.Type.HOSTCOMPONENT)) { + hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus); + } else { + m_eventBus.post(event); + } } public void register(Object object) { http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java index c3e3a9f..25bc813 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAO.java @@ -30,7 +30,10 @@ import org.apache.ambari.server.controller.internal.AlertDefinitionResourceProvi import org.apache.ambari.server.events.AlertDefinitionChangedEvent; import org.apache.ambari.server.events.AlertDefinitionDeleteEvent; import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent; +import org.apache.ambari.server.events.AlertDefinitionUpdateHolder; +import org.apache.ambari.server.events.AlertDefinitionsUpdateEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.AlertDefinitionEntity; import org.apache.ambari.server.orm.entities.AlertGroupEntity; @@ -99,6 +102,12 @@ public class AlertDefinitionDAO { @Inject private AlertDefinitionFactory alertDefinitionFactory; + @Inject + private AlertDefinitionUpdateHolder alertDefinitionUpdateHolder; + + @Inject + private StateUpdateEventPublisher stateUpdateEventPublisher; + /** * Gets an alert definition with the specified ID. * @@ -341,6 +350,9 @@ public class AlertDefinitionDAO { AlertDefinitionRegistrationEvent event = new AlertDefinitionRegistrationEvent( alertDefinition.getClusterId(), coerced); + stateUpdateEventPublisher.publish(new AlertDefinitionsUpdateEvent(coerced, + alertDefinition.getRepeatTolerance(), Boolean.valueOf(alertDefinition.isRepeatToleranceEnabled()))); + eventPublisher.publish(event); } else { LOG.warn("Unable to broadcast alert registration event for {}", @@ -380,6 +392,9 @@ public class AlertDefinitionDAO { eventPublisher.publish(event); + alertDefinitionUpdateHolder.updateIfNeeded(new AlertDefinitionsUpdateEvent(definition, + alertDefinition.getRepeatTolerance(), Boolean.valueOf(alertDefinition.isRepeatToleranceEnabled()))); + return entity; } @@ -426,6 +441,8 @@ public class AlertDefinitionDAO { alertDefinition.getClusterId(), coerced); eventPublisher.publish(event); + + stateUpdateEventPublisher.publish(new AlertDefinitionsUpdateEvent(alertDefinition.getDefinitionId())); } else { LOG.warn("Unable to broadcast alert removal event for {}", alertDefinition.getDefinitionName()); http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java index 0023def..5758c1a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java @@ -18,6 +18,10 @@ package org.apache.ambari.server.orm.dao; import org.apache.ambari.server.state.AlertState; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; + +import com.fasterxml.jackson.annotation.JsonProperty; /** * Used to return alert summary data out of the database. Alerts that are in @@ -25,10 +29,19 @@ import org.apache.ambari.server.state.AlertState; */ public class AlertSummaryDTO { + @JsonProperty("OK") private int okCount; + + @JsonProperty("WARNING") private int warningCount; + + @JsonProperty("CRITICAL") private int criticalCount; + + @JsonProperty("UNKNOWN") private int unknownCount; + + @JsonProperty("MAINTENANCE") private int maintenanceCount; /** @@ -128,4 +141,32 @@ public class AlertSummaryDTO { public void setMaintenanceCount(int maintenanceCount) { this.maintenanceCount = maintenanceCount; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + + if (!(o instanceof AlertSummaryDTO)) return false; + + AlertSummaryDTO that = (AlertSummaryDTO) o; + + return new EqualsBuilder() + .append(okCount, that.okCount) + .append(warningCount, that.warningCount) + .append(criticalCount, that.criticalCount) + .append(unknownCount, that.unknownCount) + .append(maintenanceCount, that.maintenanceCount) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(okCount) + .append(warningCount) + .append(criticalCount) + .append(unknownCount) + .append(maintenanceCount) + .toHashCode(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/44c1cb51/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java index 9d1c992..059becd 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java @@ -114,8 +114,8 @@ public abstract class AbstractServerAction implements ServerAction { report = new CommandReport(); report.setActionId(StageUtils.getActionId(hostRoleCommand.getRequestId(), hostRoleCommand.getStageId())); - report.setClusterName(executionCommand.getClusterName()); - report.setConfigurationTags(executionCommand.getConfigurationTags()); + report.setClusterId(executionCommand.getClusterId()); + //report.setConfigurationTags(executionCommand.getConfigurationTags()); report.setRole(executionCommand.getRole()); report.setRoleCommand((roleCommand == null) ? null : roleCommand.toString()); report.setServiceName(executionCommand.getServiceName());
