This is an automated email from the ASF dual-hosted git repository. mpapirkovskyy pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push: new 16a1ca4 AMBARI-25380. UI does not reflect/update task logs. (#3086) 16a1ca4 is described below commit 16a1ca4363e4237a087172b0bf52c00269c5ba18 Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org> AuthorDate: Thu Sep 26 16:14:17 2019 +0300 AMBARI-25380. UI does not reflect/update task logs. (#3086) * AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2745) * AMBARI-25380. UI does not reflect/update task logs. (AMBARI-24974. Sometimes Task Log is not refreshed in UI after operation completes.) (mpapirkovskyy) (#2747) --- .../api/stomp/NamedTasksSubscribeListener.java | 75 +++++++++ .../server/api/stomp/NamedTasksSubscriptions.java | 166 +++++++++++++++++++ .../configuration/spring/AgentStompConfig.java | 3 +- .../configuration/spring/ApiStompConfig.java | 6 + .../server/events/DefaultMessageEmitter.java | 4 +- .../ambari/server/events/NamedTaskUpdateEvent.java | 176 +++++++++++++++++++++ .../apache/ambari/server/events/STOMPEvent.java | 5 + .../events/listeners/tasks/TaskStatusListener.java | 23 ++- .../api/stomp/NamedTasksSubscriptionsTest.java | 150 ++++++++++++++++++ .../listeners/tasks/TaskStatusListenerTest.java | 92 ++++++++++- 10 files changed, 695 insertions(+), 5 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java new file mode 100644 index 0000000..6882236 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscribeListener.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.api.stomp; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.messaging.MessageHeaders; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.messaging.SessionDisconnectEvent; +import org.springframework.web.socket.messaging.SessionSubscribeEvent; +import org.springframework.web.socket.messaging.SessionUnsubscribeEvent; + +@Component +public class NamedTasksSubscribeListener { + private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscribeListener.class); + + @Autowired + private NamedTasksSubscriptions namedTasksSubscriptions; + + @EventListener + public void subscribe(SessionSubscribeEvent sse) + { + MessageHeaders msgHeaders = sse.getMessage().getHeaders(); + String sessionId = (String) msgHeaders.get("simpSessionId"); + String destination = (String) msgHeaders.get("simpDestination"); + String id = (String) msgHeaders.get("simpSubscriptionId"); + if (sessionId != null && destination != null && id != null) { + namedTasksSubscriptions.addDestination(sessionId, destination, id); + } + LOG.info(String.format("API subscribe was arrived with sessionId = %s, destination = %s and id = %s", + sessionId, destination, id)); + } + + @EventListener + public void unsubscribe(SessionUnsubscribeEvent suse) + { + MessageHeaders msgHeaders = suse.getMessage().getHeaders(); + String sessionId = (String) msgHeaders.get("simpSessionId"); + String id = (String) msgHeaders.get("simpSubscriptionId"); + if (sessionId != null && id != null) { + namedTasksSubscriptions.removeId(sessionId, id); + } + LOG.info(String.format("API unsubscribe was arrived with sessionId = %s and id = %s", + sessionId, id)); + } + + @EventListener + public void disconnect(SessionDisconnectEvent sde) + { + MessageHeaders msgHeaders = sde.getMessage().getHeaders(); + String sessionId = (String) msgHeaders.get("simpSessionId"); + if (sessionId != null) { + namedTasksSubscriptions.removeSession(sessionId); + } + LOG.info(String.format("API disconnect was arrived with sessionId = %s", + sessionId)); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java new file mode 100644 index 0000000..09acdf3 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptions.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.api.stomp; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + +@Singleton +public class NamedTasksSubscriptions { + private static Logger LOG = LoggerFactory.getLogger(NamedTasksSubscriptions.class); + + private ConcurrentHashMap<String, List<SubscriptionId>> taskIds = new ConcurrentHashMap<>(); + private final String subscriptionPrefix = "/events/tasks/"; + private final Lock taskIdsLock = new ReentrantLock(); + + private Provider<TaskStatusListener> taskStatusListenerProvider; + + @Inject + public NamedTasksSubscriptions(Provider<TaskStatusListener> taskStatusListenerProvider) { + this.taskStatusListenerProvider = taskStatusListenerProvider; + } + + public void addTaskId(String sessionId, Long taskId, String id) { + try { + taskIdsLock.lock(); + taskIds.compute(sessionId, (sid, ids) -> { + if (ids == null) { + ids = new ArrayList<>(); + } + AtomicBoolean completed = new AtomicBoolean(false); + taskStatusListenerProvider.get().getActiveTasksMap().computeIfPresent(taskId, (tid, task) -> { + if (task.getStatus().isCompletedState()) { + completed.set(true); + } + return task; + }); + if (!completed.get()) { + ids.add(new SubscriptionId(taskId, id)); + } + return ids; + }); + LOG.info(String.format("Task subscription was added for sessionId = %s, taskId = %s, id = %s", + sessionId, taskId, id)); + } finally { + taskIdsLock.unlock(); + } + } + + public void removeId(String sessionId, String id) { + taskIds.computeIfPresent(sessionId, (sid, tasks) -> { + Iterator<SubscriptionId> iterator = tasks.iterator(); + while (iterator.hasNext()) { + if (iterator.next().getId().equals(id)) { + iterator.remove(); + LOG.info(String.format("Task subscription was removed for sessionId = %s, id = %s", sessionId, id)); + } + } + return tasks; + }); + } + + public void removeTaskId(Long taskId) { + try { + taskIdsLock.lock(); + for (String sessionId : taskIds.keySet()) { + taskIds.computeIfPresent(sessionId, (id, tasks) -> { + Iterator<SubscriptionId> iterator = tasks.iterator(); + while (iterator.hasNext()) { + if (iterator.next().getTaskId().equals(taskId)) { + iterator.remove(); + LOG.info(String.format("Task subscription was removed for sessionId = %s and taskId = %s", + sessionId, taskId)); + } + } + return tasks; + }); + } + } finally { + taskIdsLock.unlock(); + } + } + + public void removeSession(String sessionId) { + try { + taskIdsLock.lock(); + taskIds.remove(sessionId); + LOG.info(String.format("Task subscriptions were removed for sessionId = %s", sessionId)); + } finally { + taskIdsLock.unlock(); + } + } + + public Optional<Long> matchDestination(String destination) { + Optional<Long> taskIdOpt = Optional.of(StringUtils.substringAfter(destination, subscriptionPrefix)) + .filter(StringUtils::isNotEmpty) + .filter(StringUtils::isNumeric) + .map(Long::parseLong); + return taskIdOpt; + } + + public void addDestination(String sessionId, String destination, String id) { + Optional<Long> taskIdOpt = matchDestination(destination); + if (taskIdOpt.isPresent()) { + addTaskId(sessionId, taskIdOpt.get(), id); + } + } + + public boolean checkTaskId(Long taskId) { + for (List<SubscriptionId> ids: taskIds.values()) { + for (SubscriptionId subscriptionId : ids) { + if (subscriptionId.getTaskId().equals(taskId)) { + return true; + } + } + } + return false; + } + + public class SubscriptionId { + private final Long taskId; + private final String id; + + public SubscriptionId(Long taskId, String id) { + this.taskId = taskId; + this.id = id; + } + + public Long getTaskId() { + return taskId; + } + + public String getId() { + return id; + } + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java index e1251d9..87c36c9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/AgentStompConfig.java @@ -20,7 +20,6 @@ package org.apache.ambari.server.configuration.spring; import javax.servlet.ServletContext; import org.apache.ambari.server.agent.stomp.HeartbeatController; -import org.apache.ambari.server.api.stomp.TestController; import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener; import org.eclipse.jetty.websocket.server.WebSocketServerFactory; @@ -41,7 +40,7 @@ import com.google.inject.Injector; @Configuration @EnableWebSocketMessageBroker -@ComponentScan(basePackageClasses = {TestController.class, HeartbeatController.class}) +@ComponentScan(basePackageClasses = {HeartbeatController.class}) @Import({RootStompConfig.class,GuiceBeansConfig.class}) public class AgentStompConfig extends AbstractWebSocketMessageBrokerConfigurer { private org.apache.ambari.server.configuration.Configuration configuration; diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java index 44479ae..e147200 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java @@ -17,6 +17,7 @@ */ package org.apache.ambari.server.configuration.spring; +import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions; import org.apache.ambari.server.api.stomp.TestController; import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.events.listeners.requests.STOMPUpdateListener; @@ -50,6 +51,11 @@ public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer { return new STOMPUpdateListener(injector, DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES); } + @Bean + public NamedTasksSubscriptions namedTasksSubscribtions(Injector injector) { + return injector.getInstance(NamedTasksSubscriptions.class); + } + @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/v1") diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java index e9f5c93..48d4fbc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java @@ -43,6 +43,7 @@ public class DefaultMessageEmitter extends MessageEmitter { put(STOMPEvent.Type.AGENT_CONFIGS, "/configs"); put(STOMPEvent.Type.CONFIGS, "/events/configs"); put(STOMPEvent.Type.HOSTCOMPONENT, "/events/hostcomponents"); + put(STOMPEvent.Type.NAMEDTASK, "/events/tasks"); put(STOMPEvent.Type.REQUEST, "/events/requests"); put(STOMPEvent.Type.SERVICE, "/events/services"); put(STOMPEvent.Type.HOST, "/events/hosts"); @@ -70,6 +71,7 @@ public class DefaultMessageEmitter extends MessageEmitter { STOMPEvent.Type.UI_TOPOLOGY, STOMPEvent.Type.CONFIGS, STOMPEvent.Type.HOSTCOMPONENT, + STOMPEvent.Type.NAMEDTASK, STOMPEvent.Type.REQUEST, STOMPEvent.Type.SERVICE, STOMPEvent.Type.HOST, @@ -101,6 +103,6 @@ public class DefaultMessageEmitter extends MessageEmitter { @Override protected String getDestination(STOMPEvent stompEvent) { - return DEFAULT_DESTINATIONS.get(stompEvent.getType()); + return stompEvent.completeDestination(DEFAULT_DESTINATIONS.get(stompEvent.getType())); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java new file mode 100644 index 0000000..04ee04e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/NamedTaskUpdateEvent.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +import java.util.Objects; + +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Single host role command update info. This update will be sent to all subscribed recipients. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class NamedTaskUpdateEvent extends STOMPEvent { + + private Long id; + private Long requestId; + private String hostName; + private Long endTime; + private HostRoleStatus status; + private String errorLog; + private String outLog; + private String stderr; + private String stdout; + + @JsonProperty("structured_out") + private String structuredOut; + + public NamedTaskUpdateEvent(Long id, Long requestId, String hostName, Long endTime, HostRoleStatus status, + String errorLog, String outLog, String stderr, String stdout, String structuredOut) { + super(Type.NAMEDTASK); + this.id = id; + this.requestId = requestId; + this.hostName = hostName; + this.endTime = endTime; + this.status = status; + this.errorLog = errorLog; + this.outLog = outLog; + this.stderr = stderr; + this.stdout = stdout; + this.structuredOut = structuredOut; + } + + public NamedTaskUpdateEvent(HostRoleCommand hostRoleCommand) { + this(hostRoleCommand.getTaskId(), hostRoleCommand.getRequestId(), hostRoleCommand.getHostName(), + hostRoleCommand.getEndTime(), hostRoleCommand.getStatus(), hostRoleCommand.getErrorLog(), + hostRoleCommand.getOutputLog(), hostRoleCommand.getStderr(), hostRoleCommand.getStdout(), + hostRoleCommand.getStructuredOut()); + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getRequestId() { + return requestId; + } + + public void setRequestId(Long requestId) { + this.requestId = requestId; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public Long getEndTime() { + return endTime; + } + + public void setEndTime(Long endTime) { + this.endTime = endTime; + } + + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public String getErrorLog() { + return errorLog; + } + + public void setErrorLog(String errorLog) { + this.errorLog = errorLog; + } + + public String getOutLog() { + return outLog; + } + + public void setOutLog(String outLog) { + this.outLog = outLog; + } + + public String getStderr() { + return stderr; + } + + public void setStderr(String stderr) { + this.stderr = stderr; + } + + public String getStdout() { + return stdout; + } + + public void setStdout(String stdout) { + this.stdout = stdout; + } + + public String getStructuredOut() { + return structuredOut; + } + + public void setStructuredOut(String structuredOut) { + this.structuredOut = structuredOut; + } + + @Override + public String completeDestination(String destination) { + return destination + "/" + id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NamedTaskUpdateEvent that = (NamedTaskUpdateEvent) o; + return Objects.equals(id, that.id) && + Objects.equals(requestId, that.requestId) && + Objects.equals(hostName, that.hostName) && + Objects.equals(endTime, that.endTime) && + status == that.status && + Objects.equals(errorLog, that.errorLog) && + Objects.equals(outLog, that.outLog) && + Objects.equals(stderr, that.stderr) && + Objects.equals(stdout, that.stdout) && + Objects.equals(structuredOut, that.structuredOut); + } + + @Override + public int hashCode() { + return Objects.hash(id, requestId, hostName, endTime, status, errorLog, outLog, stderr, stdout, structuredOut); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java index 15c3b1e..f3e119f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/STOMPEvent.java @@ -54,6 +54,7 @@ public abstract class STOMPEvent { CONFIGS("events.configs"), HOSTCOMPONENT("events.hostcomponents"), NAMEDHOSTCOMPONENT("events.hostrolecommands.named"), + NAMEDTASK("events.tasks.named"), REQUEST("events.requests"), SERVICE("events.services"), HOST("events.hosts"), @@ -76,4 +77,8 @@ public abstract class STOMPEvent { return metricName; } } + + public String completeDestination(String destination) { + return destination; + } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java index b188729..73aa533 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -34,7 +34,9 @@ import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.actionmanager.Request; import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions; import org.apache.ambari.server.controller.internal.CalculatedStatus; +import org.apache.ambari.server.events.NamedTaskUpdateEvent; import org.apache.ambari.server.events.RequestUpdateEvent; import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.TaskUpdateEvent; @@ -95,12 +97,15 @@ public class TaskStatusListener { private STOMPUpdatePublisher STOMPUpdatePublisher; + private NamedTasksSubscriptions namedTasksSubscriptions; + @Inject public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO, - STOMPUpdatePublisher STOMPUpdatePublisher) { + STOMPUpdatePublisher STOMPUpdatePublisher, NamedTasksSubscriptions namedTasksSubscriptions) { this.stageDAO = stageDAO; this.requestDAO = requestDAO; this.STOMPUpdatePublisher = STOMPUpdatePublisher; + this.namedTasksSubscriptions = namedTasksSubscriptions; taskEventPublisher.register(this); } @@ -129,6 +134,7 @@ public class TaskStatusListener { Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>(); Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>(); Set<RequestUpdateEvent> requestsToPublish = new HashSet<>(); + Set<NamedTaskUpdateEvent> namedTasksToPublish = new HashSet<>(); for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) { Long reportedTaskId = hostRoleCommand.getTaskId(); @@ -143,6 +149,17 @@ public class TaskStatusListener { stagesWithReceivedTaskStatus.add(stageEntityPK); requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId()); + NamedTaskUpdateEvent namedTaskUpdateEvent = new NamedTaskUpdateEvent(hostRoleCommand); + if (namedTasksSubscriptions.checkTaskId(reportedTaskId) + && !namedTaskUpdateEvent.equals(new NamedTaskUpdateEvent(activeTasksMap.get(reportedTaskId)))) { + namedTasksToPublish.add(namedTaskUpdateEvent); + } + + // unsubscribe on complete (no any update will be sent anyway) + if (hostRoleCommand.getStatus().equals(HostRoleStatus.COMPLETED)) { + namedTasksSubscriptions.removeTaskId(reportedTaskId); + } + if (!activeTasksMap.get(reportedTaskId).getStatus().equals(hostRoleCommand.getStatus())) { // Ignore requests not related to any cluster. "requests" topic is used for cluster requests only. Long clusterId = activeRequestMap.get(hostRoleCommand.getRequestId()).getClusterId(); @@ -178,6 +195,10 @@ public class TaskStatusListener { for (RequestUpdateEvent requestToPublish : requestsToPublish) { STOMPUpdatePublisher.publish(requestToPublish); } + for (NamedTaskUpdateEvent namedTaskUpdateEvent : namedTasksToPublish) { + LOG.info(String.format("NamedTaskUpdateEvent with id %s will be send", namedTaskUpdateEvent.getId())); + STOMPUpdatePublisher.publish(namedTaskUpdateEvent); + } } /** diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java new file mode 100644 index 0000000..2107c41 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/api/stomp/NamedTasksSubscriptionsTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.api.stomp; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; +import org.junit.Before; +import org.junit.Test; + +import com.google.inject.Provider; + +public class NamedTasksSubscriptionsTest { + private static final String SESSION_ID_1 = "fdsg3"; + private static final String SESSION_ID_2 = "idfg6"; + + private NamedTasksSubscriptions tasksSubscriptions; + private Provider<TaskStatusListener> taskStatusListenerProvider; + private TaskStatusListener taskStatusListener; + + @Before + public void setupTest() { + taskStatusListenerProvider = createMock(Provider.class); + taskStatusListener = createMock(TaskStatusListener.class); + + Map<Long, HostRoleCommand> hostRoleCommands = new HashMap<>(); + HostRoleCommand hostRoleCommand1 = createMock(HostRoleCommand.class); + HostRoleCommand hostRoleCommand4 = createMock(HostRoleCommand.class); + HostRoleCommand hostRoleCommand5 = createMock(HostRoleCommand.class); + + expect(hostRoleCommand1.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes(); + expect(hostRoleCommand4.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes(); + expect(hostRoleCommand5.getStatus()).andReturn(HostRoleStatus.IN_PROGRESS).anyTimes(); + + hostRoleCommands.put(1L, hostRoleCommand1); + hostRoleCommands.put(4L, hostRoleCommand4); + hostRoleCommands.put(5L, hostRoleCommand5); + expect(taskStatusListener.getActiveTasksMap()).andReturn(hostRoleCommands).anyTimes(); + expect(taskStatusListenerProvider.get()).andReturn(taskStatusListener).anyTimes(); + + replay(taskStatusListenerProvider, taskStatusListener, hostRoleCommand1, hostRoleCommand4, hostRoleCommand5); + tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider); + tasksSubscriptions.addTaskId(SESSION_ID_1, 1L, "sub-1"); + tasksSubscriptions.addTaskId(SESSION_ID_1, 5L, "sub-5"); + tasksSubscriptions.addTaskId(SESSION_ID_2, 1L, "sub-1"); + tasksSubscriptions.addTaskId(SESSION_ID_2, 4L, "sub-4"); + } + + @Test + public void testMatching() { + Optional<Long> taskIdOpt = tasksSubscriptions.matchDestination("/events/tasks/1"); + assertTrue(taskIdOpt.isPresent()); + assertEquals(1L, taskIdOpt.get().longValue()); + assertFalse(tasksSubscriptions.matchDestination("/events/topologies").isPresent()); + } + + @Test + public void testCheckId() { + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertTrue(tasksSubscriptions.checkTaskId(5L)); + assertFalse(tasksSubscriptions.checkTaskId(2L)); + } + + @Test + public void testRemoveBySessionId() { + tasksSubscriptions.removeSession(SESSION_ID_1); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.removeSession(SESSION_ID_2); + assertFalse(tasksSubscriptions.checkTaskId(1L)); + assertFalse(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + } + + @Test + public void testRemoveById() { + tasksSubscriptions.removeId(SESSION_ID_1, "sub-1"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertTrue(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.removeId(SESSION_ID_1, "sub-5"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.removeId(SESSION_ID_2, "sub-1"); + assertFalse(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.removeId(SESSION_ID_2, "sub-4"); + assertFalse(tasksSubscriptions.checkTaskId(1L)); + assertFalse(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + } + + @Test + public void testAddDestination() { + tasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider); + tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/1", "sub-1"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertFalse(tasksSubscriptions.checkTaskId(4L)); + assertFalse(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.addDestination(SESSION_ID_1, "/events/tasks/5", "sub-5"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertFalse(tasksSubscriptions.checkTaskId(4L)); + assertTrue(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/1", "sub-1"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertFalse(tasksSubscriptions.checkTaskId(4L)); + assertTrue(tasksSubscriptions.checkTaskId(5L)); + + tasksSubscriptions.addDestination(SESSION_ID_2, "/events/tasks/4", "sub-4"); + assertTrue(tasksSubscriptions.checkTaskId(1L)); + assertTrue(tasksSubscriptions.checkTaskId(4L)); + assertTrue(tasksSubscriptions.checkTaskId(5L)); + } +} diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java index 6e62ef4..03e0655 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListenerTest.java @@ -20,7 +20,10 @@ package org.apache.ambari.server.events.listeners.tasks; import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import java.util.ArrayList; import java.util.Collections; @@ -32,6 +35,8 @@ import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.actionmanager.ExecutionCommandWrapperFactory; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.api.stomp.NamedTasksSubscriptions; +import org.apache.ambari.server.events.NamedTaskUpdateEvent; import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.TaskUpdateEvent; import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher; @@ -44,12 +49,14 @@ import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.StageEntityPK; import org.apache.ambari.server.state.ServiceComponentHostEvent; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Assert; import org.junit.Test; import com.google.inject.Inject; +import com.google.inject.Provider; public class TaskStatusListenerTest extends EasyMockSupport { @@ -93,6 +100,7 @@ public class TaskStatusListenerTest extends EasyMockSupport { StageEntity stageEntity = createNiceMock(StageEntity.class); RequestEntity requestEntity = createNiceMock(RequestEntity.class); STOMPUpdatePublisher statePublisher = createNiceMock(STOMPUpdatePublisher.class); + NamedTasksSubscriptions namedTasksSubscriptions = createNiceMock(NamedTasksSubscriptions.class); EasyMock.expect(stageEntity.getStatus()).andReturn(hostRoleStatus).anyTimes();; EasyMock.expect(stageEntity.getDisplayStatus()).andReturn(hostRoleStatus).anyTimes(); EasyMock.expect(stageEntity.isSkippable()).andReturn(Boolean.FALSE).anyTimes();; @@ -110,9 +118,11 @@ public class TaskStatusListenerTest extends EasyMockSupport { EasyMock.replay(stageDAO); EasyMock.replay(requestDAO); EasyMock.replay(statePublisher); + EasyMock.replay(namedTasksSubscriptions); TaskCreateEvent event = new TaskCreateEvent(hostRoleCommands); - TaskStatusListener listener = new TaskStatusListener(publisher,stageDAO,requestDAO,statePublisher); + TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, statePublisher, + namedTasksSubscriptions); Assert.assertTrue(listener.getActiveTasksMap().isEmpty()); Assert.assertTrue(listener.getActiveStageMap().isEmpty()); @@ -165,4 +175,84 @@ public class TaskStatusListenerTest extends EasyMockSupport { verifyAll(); } + @Test + public void testNamedTasksEnabled() { + final Long taskId = 1L; + final Long requestId = 2L; + final HostRoleStatus status = HostRoleStatus.COMPLETED; + final String stderr = "gW$%SGFbhzsdfHBzdffdfd"; + final String stdout = "gW$%gTESJ KHBjzdkfjbgv"; + final String errorLog = " wTHT J YHKtjgsjgbvklfj"; + final String outputLog = "546ky3kt%V$WYk4tgs5xzs"; + + Provider<TaskStatusListener> taskStatusListenerProvider = createMock(Provider.class); + + NamedTasksSubscriptions namedTasksSubscriptions = new NamedTasksSubscriptions(taskStatusListenerProvider); + + Capture<NamedTaskUpdateEvent> namedTaskUpdateEventCapture = Capture.newInstance(); + STOMPUpdatePublisher stompUpdatePublisher = createStrictMock(STOMPUpdatePublisher.class); + stompUpdatePublisher.publish(capture(namedTaskUpdateEventCapture)); + expectLastCall(); + + ServiceComponentHostEvent serviceComponentHostEvent = createNiceMock(ServiceComponentHostEvent.class); + HostDAO hostDAO = createNiceMock(HostDAO.class); + + EasyMock.replay(hostDAO); + EasyMock.replay(serviceComponentHostEvent); + + List<HostRoleCommand> updateHostRolesCommands = new ArrayList<>(); + HostRoleCommand updateHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE, + serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory); + updateHostRoleCommand.setStatus(status); + updateHostRoleCommand.setRequestId(requestId); + updateHostRoleCommand.setStageId(3L); + updateHostRoleCommand.setTaskId(taskId); + updateHostRoleCommand.setStderr(stderr); + updateHostRoleCommand.setStdout(stdout); + updateHostRoleCommand.setErrorLog(errorLog); + updateHostRoleCommand.setOutputLog(outputLog); + updateHostRolesCommands.add(updateHostRoleCommand); + + StageDAO stageDAO = createNiceMock(StageDAO.class); + RequestDAO requestDAO = createNiceMock(RequestDAO.class); + + EasyMock.replay(stageDAO); + EasyMock.replay(requestDAO); + EasyMock.replay(stompUpdatePublisher); + + TaskStatusListener listener = new TaskStatusListener(publisher, stageDAO, requestDAO, stompUpdatePublisher, + namedTasksSubscriptions); + + expect(taskStatusListenerProvider.get()).andReturn(listener); + + EasyMock.replay(taskStatusListenerProvider); + + // subscribe for task + namedTasksSubscriptions.addTaskId("", taskId, "sub-1"); + + // add dummy host role command as active + // status should be the same to avoid request update event firing + HostRoleCommand activeHostRoleCommand = new HostRoleCommand("hostName", Role.DATANODE, + serviceComponentHostEvent, RoleCommand.EXECUTE, hostDAO, executionCommandDAO, ecwFactory); + activeHostRoleCommand.setStatus(status); + listener.getActiveTasksMap().put(taskId, activeHostRoleCommand); + + listener.onTaskUpdateEvent(new TaskUpdateEvent(updateHostRolesCommands)); + + Assert.assertNotNull(namedTaskUpdateEventCapture.getValues()); + Assert.assertEquals(1L, namedTaskUpdateEventCapture.getValues().size()); + + NamedTaskUpdateEvent capturedEvent = namedTaskUpdateEventCapture.getValue(); + + Assert.assertEquals(taskId, capturedEvent.getId()); + Assert.assertEquals(requestId, capturedEvent.getRequestId()); + Assert.assertEquals(status, capturedEvent.getStatus()); + Assert.assertEquals(stderr, capturedEvent.getStderr()); + Assert.assertEquals(stdout, capturedEvent.getStdout()); + Assert.assertEquals(errorLog, capturedEvent.getErrorLog()); + Assert.assertEquals(outputLog, capturedEvent.getOutLog()); + + verifyAll(); + } + }