This is an automated email from the ASF dual-hosted git repository.
jluniya pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new ec968ed AMBARI-24518. Requests STOMP topic sent updates for host
check request. (#2142)
ec968ed is described below
commit ec968edf78a11921d0555e3e481c0ce9763d80f1
Author: Myroslav Papirkovskyi <[email protected]>
AuthorDate: Tue Aug 21 20:38:18 2018 +0300
AMBARI-24518. Requests STOMP topic sent updates for host check request.
(#2142)
* AMBARI-24518. Requests STOMP topic sent updates for host check request.
(mpapirkovskyy)
* AMBARI-24518. Requests STOMP topic sent updates for host check request.
(mpapirkovskyy)
---
.../server/actionmanager/ActionDBAccessorImpl.java | 14 ++++++-
.../events/listeners/tasks/TaskStatusListener.java | 46 ++++++++++++++++------
2 files changed, 47 insertions(+), 13 deletions(-)
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 3543486..1a055b3 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -436,8 +436,18 @@ public class ActionDBAccessorImpl implements
ActionDBAccessor {
TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands);
taskEventPublisher.publish(taskCreateEvent);
List<HostRoleCommandEntity> hostRoleCommandEntities =
hostRoleCommandDAO.findByRequest(requestEntity.getRequestId());
- STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
- hostRoleCommandDAO, topologyManager, clusterName,
hostRoleCommandEntities));
+
+ // "requests" STOMP topic is used for clusters related requests only.
+ // Requests without clusters (like host checks) should be posted to
divided topic.
+ if (clusterName != null) {
+ STOMPUpdatePublisher.publish(new RequestUpdateEvent(requestEntity,
+ hostRoleCommandDAO, topologyManager, clusterName,
hostRoleCommandEntities));
+ } else {
+ LOG.debug("No STOMP request update event was fired for new request due
no cluster related, " +
+ "request id: {}, command name: {}",
+ requestEntity.getRequestId(),
+ requestEntity.getCommandName());
+ }
}
@Override
diff --git
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
index 0570fdf..b188729 100644
---
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
+++
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ambari.server.ClusterNotFoundException;
import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -123,7 +122,7 @@ public class TaskStatusListener {
* @param event Consumes {@link TaskUpdateEvent}.
*/
@Subscribe
- public void onTaskUpdateEvent(TaskUpdateEvent event) throws
ClusterNotFoundException {
+ public void onTaskUpdateEvent(TaskUpdateEvent event) {
LOG.debug("Received task update event {}", event);
List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands();
List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new
ArrayList<>();
@@ -145,13 +144,27 @@ public class TaskStatusListener {
requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId());
if
(!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus()))
{
- Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new
HashSet<>();
- hostRoleCommands.add(new
RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
- hostRoleCommand.getRequestId(),
- hostRoleCommand.getStatus(),
- hostRoleCommand.getHostName()));
- requestsToPublish.add(new
RequestUpdateEvent(hostRoleCommand.getRequestId(),
-
activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(),
hostRoleCommands));
+ // Ignore requests not related to any cluster. "requests" topic is
used for cluster requests only.
+ Long clusterId =
activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId();
+ if (clusterId != null && clusterId != -1) {
+ Set<RequestUpdateEvent.HostRoleCommand> hostRoleCommands = new
HashSet<>();
+ hostRoleCommands.add(new
RequestUpdateEvent.HostRoleCommand(hostRoleCommand.getTaskId(),
+ hostRoleCommand.getRequestId(),
+ hostRoleCommand.getStatus(),
+ hostRoleCommand.getHostName()));
+ requestsToPublish.add(new
RequestUpdateEvent(hostRoleCommand.getRequestId(),
+
activeRequestMap.get(hostRoleCommand.getRequestId()).getStatus(),
hostRoleCommands));
+ } else {
+ LOG.debug("No STOMP request update event was fired for host
component status change due no cluster related, " +
+ "request id: {}, role: {}, role command: {}, host: {},
task id: {}, old state: {}, new state: {}",
+ hostRoleCommand.getRequestId(),
+ hostRoleCommand.getRole(),
+ hostRoleCommand.getRoleCommand(),
+ hostRoleCommand.getHostName(),
+ hostRoleCommand.getTaskId(),
+ activeTasksMap.get(reportedTaskId).getStatus(),
+ hostRoleCommand.getStatus());
+ }
}
}
}
@@ -264,7 +277,8 @@ public class TaskStatusListener {
// Request entity of the hostrolecommand should be persisted before
publishing task create event
assert requestEntity != null;
Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK);
- ActiveRequest request = new
ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
stageEntityPKs);
+ ActiveRequest request = new
ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(),
+ stageEntityPKs, requestEntity.getClusterId());
activeRequestMap.put(requestId, request);
}
}
@@ -524,11 +538,14 @@ public class TaskStatusListener {
private HostRoleStatus status;
private HostRoleStatus displayStatus;
private Set <StageEntityPK> stageEntityPks;
+ private Long clusterId;
- public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus,
Set<StageEntityPK> stageEntityPks) {
+ public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus,
Set<StageEntityPK> stageEntityPks,
+ Long clusterId) {
this.status = status;
this.displayStatus = displayStatus;
this.stageEntityPks = stageEntityPks;
+ this.clusterId = clusterId;
}
public HostRoleStatus getStatus() {
@@ -559,6 +576,13 @@ public class TaskStatusListener {
stageEntityPks.add(stageEntityPK);
}
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
}
/**