AMBARI-18868. Stage and Request status should be persisted in the database. (jaimin)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0fc7a667 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0fc7a667 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0fc7a667 Branch: refs/heads/trunk Commit: 0fc7a6671feb10dc0e8475dc4878942cf19f46cc Parents: dd174f4 Author: Jaimin Jetly <[email protected]> Authored: Fri Feb 17 09:31:10 2017 -0800 Committer: Jaimin Jetly <[email protected]> Committed: Fri Feb 17 09:31:10 2017 -0800 ---------------------------------------------------------------------- .../actionmanager/ActionDBAccessorImpl.java | 108 ++-- .../server/actionmanager/ActionScheduler.java | 31 + .../ambari/server/actionmanager/Request.java | 8 +- .../ambari/server/actionmanager/Stage.java | 25 + .../controller/internal/CalculatedStatus.java | 390 +++++++++++- .../ambari/server/events/TaskCreateEvent.java | 48 ++ .../apache/ambari/server/events/TaskEvent.java | 66 ++ .../ambari/server/events/TaskUpdateEvent.java | 35 ++ .../listeners/tasks/TaskStatusListener.java | 609 +++++++++++++++++++ .../events/publishers/TaskEventPublisher.java | 62 ++ .../server/orm/dao/HostRoleCommandDAO.java | 67 +- .../ambari/server/orm/dao/RequestDAO.java | 8 + .../apache/ambari/server/orm/dao/StageDAO.java | 32 +- .../orm/entities/HostRoleCommandEntity.java | 4 +- .../server/orm/entities/RequestEntity.java | 49 +- .../ambari/server/orm/entities/StageEntity.java | 70 ++- .../server/orm/entities/StageEntityPK.java | 12 + .../server/upgrade/UpgradeCatalog300.java | 70 +++ .../main/resources/Ambari-DDL-Derby-CREATE.sql | 7 +- .../main/resources/Ambari-DDL-MySQL-CREATE.sql | 7 +- .../main/resources/Ambari-DDL-Oracle-CREATE.sql | 7 +- .../resources/Ambari-DDL-Postgres-CREATE.sql | 7 +- .../resources/Ambari-DDL-SQLAnywhere-CREATE.sql | 7 +- .../resources/Ambari-DDL-SQLServer-CREATE.sql | 7 +- .../actionmanager/TestActionDBAccessorImpl.java | 3 +- .../actionmanager/TestActionScheduler.java | 71 ++- .../alerts/AmbariPerformanceRunnableTest.java | 7 +- .../internal/UpgradeResourceProviderTest.java | 1 - .../UpgradeSummaryResourceProviderTest.java | 1 - .../listeners/tasks/TaskStatusListenerTest.java | 164 +++++ .../ambari/server/state/ConfigHelperTest.java | 2 + .../cluster/ClusterEffectiveVersionTest.java | 5 +- .../services/RetryUpgradeActionServiceTest.java | 1 - .../server/upgrade/UpgradeCatalog300Test.java | 20 + 34 files changed, 1892 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 7881a4b..b813fe6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -45,7 +45,9 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.events.HostsRemovedEvent; import org.apache.ambari.server.events.RequestFinishedEvent; +import org.apache.ambari.server.events.TaskCreateEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.apache.ambari.server.events.publishers.TaskEventPublisher; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostDAO; @@ -130,6 +132,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { AmbariEventPublisher ambariEventPublisher; @Inject + TaskEventPublisher taskEventPublisher; + + @Inject AuditLogger auditLogger; /** @@ -205,8 +210,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { public Collection<HostRoleCommandEntity> abortOperation(long requestId) { long now = System.currentTimeMillis(); - endRequest(requestId); - // only request commands which actually need to be aborted; requesting all // commands here can cause OOM problems during large requests like upgrades List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId, @@ -228,7 +231,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { if (!commands.isEmpty()) { return hostRoleCommandDAO.mergeAll(commands); } - + endRequest(requestId); return Collections.emptyList(); } @@ -283,7 +286,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { @Override @Experimental(feature = ExperimentalFeature.PARALLEL_PROCESSING) public List<Stage> getStagesInProgress() { - List<StageEntity> stageEntities = stageDAO.findByCommandStatuses( + List<StageEntity> stageEntities = stageDAO.findByStatuses( HostRoleStatus.IN_PROGRESS_STATUSES); return getStagesForEntities(stageEntities); } @@ -343,6 +346,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { RequestEntity requestEntity = request.constructNewPersistenceEntity(); Long clusterId = -1L; + Long requestId = requestEntity.getRequestId(); ClusterEntity clusterEntity = clusterDAO.findById(request.getClusterId()); if (clusterEntity != null) { clusterId = clusterEntity.getClusterId(); @@ -356,8 +360,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { addRequestToAuditlogCache(request); + List<HostRoleCommand> hostRoleCommands = new ArrayList<>(); + for (Stage stage : request.getStages()) { StageEntity stageEntity = stage.constructNewPersistenceEntity(); + Long stageId = stageEntity.getStageId(); stageEntities.add(stageEntity); stageEntity.setClusterId(clusterId); stageEntity.setRequest(requestEntity); @@ -366,6 +373,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { List<HostRoleCommand> orderedHostRoleCommands = stage.getOrderedHostRoleCommands(); for (HostRoleCommand hostRoleCommand : orderedHostRoleCommands) { + hostRoleCommand.setRequestId(requestId); + hostRoleCommand.setStageId(stageId); HostRoleCommandEntity hostRoleCommandEntity = hostRoleCommand.constructNewPersistenceEntity(); hostRoleCommandEntity.setStage(stageEntity); hostRoleCommandDAO.create(hostRoleCommandEntity); @@ -415,11 +424,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { hostRoleCommandEntity.setExecutionCommand(executionCommandEntity); executionCommandDAO.create(hostRoleCommandEntity.getExecutionCommand()); - hostRoleCommandEntity = hostRoleCommandDAO.merge(hostRoleCommandEntity); + hostRoleCommandEntity = hostRoleCommandDAO.mergeWithoutPublishEvent(hostRoleCommandEntity); if (null != hostEntity) { hostEntity = hostDAO.merge(hostEntity); } + hostRoleCommands.add(hostRoleCommand); } for (RoleSuccessCriteriaEntity roleSuccessCriteriaEntity : stageEntity.getRoleSuccessCriterias()) { @@ -431,6 +441,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { requestEntity.setStages(stageEntities); requestDAO.merge(requestEntity); + + TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands); + taskEventPublisher.publish(taskCreateEvent); } @Override @@ -497,66 +510,55 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { long now = System.currentTimeMillis(); List<Long> requestsToCheck = new ArrayList<Long>(); - List<Long> abortedCommandUpdates = new ArrayList<Long>(); List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet()); + List<HostRoleCommandEntity> commandEntitiesToMerge = new ArrayList<HostRoleCommandEntity>(); for (HostRoleCommandEntity commandEntity : commandEntities) { CommandReport report = taskReports.get(commandEntity.getTaskId()); - - boolean statusChanged = false; - - switch (commandEntity.getStatus()) { - case ABORTED: - // We don't want to overwrite statuses for ABORTED tasks with - // statuses that have been received from the agent after aborting task - abortedCommandUpdates.add(commandEntity.getTaskId()); - break; - default: - HostRoleStatus status = HostRoleStatus.valueOf(report.getStatus()); - // if FAILED and marked for holding then set status = HOLDING_FAILED - if (status == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) { - status = HostRoleStatus.HOLDING_FAILED; - - // tasks can be marked as skipped when they fail - if (commandEntity.isFailureAutoSkipped()) { - status = HostRoleStatus.SKIPPED_FAILED; - } + HostRoleStatus existingTaskStatus = commandEntity.getStatus(); + HostRoleStatus reportedTaskStatus = HostRoleStatus.valueOf(report.getStatus()); + if (!existingTaskStatus.isCompletedState() || existingTaskStatus == HostRoleStatus.ABORTED) { + // if FAILED and marked for holding then set reportedTaskStatus = HOLDING_FAILED + if (reportedTaskStatus == HostRoleStatus.FAILED && commandEntity.isRetryAllowed()) { + reportedTaskStatus = HostRoleStatus.HOLDING_FAILED; + + // tasks can be marked as skipped when they fail + if (commandEntity.isFailureAutoSkipped()) { + reportedTaskStatus = HostRoleStatus.SKIPPED_FAILED; } - - commandEntity.setStatus(status); - statusChanged = true; - break; - } - - commandEntity.setStdOut(report.getStdOut().getBytes()); - commandEntity.setStdError(report.getStdErr().getBytes()); - commandEntity.setStructuredOut(report.getStructuredOut() == null ? null : - report.getStructuredOut().getBytes()); - commandEntity.setExitcode(report.getExitCode()); - - if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) { - commandEntity.setEndTime(now); - - String actionId = report.getActionId(); - long[] requestStageIds = StageUtils.getRequestStage(actionId); - long requestId = requestStageIds[0]; - long stageId = requestStageIds[1]; - if(statusChanged) { - auditLog(commandEntity, requestId); } - if (requestDAO.getLastStageId(requestId).equals(stageId)) { - requestsToCheck.add(requestId); + if (!existingTaskStatus.isCompletedState()) { + commandEntity.setStatus(reportedTaskStatus); } + commandEntity.setStdOut(report.getStdOut().getBytes()); + commandEntity.setStdError(report.getStdErr().getBytes()); + commandEntity.setStructuredOut(report.getStructuredOut() == null ? null : + report.getStructuredOut().getBytes()); + commandEntity.setExitcode(report.getExitCode()); + if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) { + commandEntity.setEndTime(now); + + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + long requestId = requestStageIds[0]; + long stageId = requestStageIds[1]; + auditLog(commandEntity, requestId); + if (requestDAO.getLastStageId(requestId).equals(stageId)) { + requestsToCheck.add(requestId); + } + } + commandEntitiesToMerge.add(commandEntity); + } else { + LOG.warn(String.format("Request for invalid transition of host role command status received for task id %d from " + + "agent: %s -> %s",commandEntity.getTaskId(),existingTaskStatus,reportedTaskStatus)); } } // no need to merge if there's nothing to merge - if (!commandEntities.isEmpty()) { - hostRoleCommandDAO.mergeAll(commandEntities); + if (!commandEntitiesToMerge.isEmpty()) { + hostRoleCommandDAO.mergeAll(commandEntitiesToMerge); } - // Invalidate cache because of updates to ABORTED commands - hostRoleCommandCache.invalidateAll(abortedCommandUpdates); for (Long requestId : requestsToCheck) { endRequestIfCompleted(requestId); @@ -923,7 +925,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { return HostRoleStatus.QUEUED; } Collection<HostRoleStatus> taskStatuses = details.getTaskStatuses(); - return CalculatedStatus.calculateSummaryStatusOfStage(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false); + return CalculatedStatus.calculateSummaryStatus(CalculatedStatus.calculateStatusCounts(taskStatuses), numberOfTasks, false); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 680c0a6..a92c03c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.persistence.EntityManager; @@ -49,6 +50,7 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.events.ActionFinalReportReceivedEvent; import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent; +import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.events.publishers.JPAEventPublisher; import org.apache.ambari.server.metadata.RoleCommandOrder; @@ -75,10 +77,13 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.eventbus.Subscribe; import com.google.common.reflect.TypeToken; @@ -179,6 +184,9 @@ class ActionScheduler implements Runnable { * we receive awake() request during running a scheduler iteration. */ private boolean activeAwakeRequest = false; + + private AtomicBoolean taskStatusLoaded = new AtomicBoolean(); + //Cache for clusterHostinfo, key - stageId-requestId private Cache<String, Map<String, Set<String>>> clusterHostInfoCache; private Cache<String, Map<String, String>> commandParamsStageCache; @@ -353,6 +361,8 @@ class ActionScheduler implements Runnable { LOG.debug("Processing {} in progress stages ", stages.size()); } + publishInProgressTasks(stages); + if (stages.isEmpty()) { // Nothing to do if (LOG.isDebugEnabled()) { @@ -532,6 +542,27 @@ class ActionScheduler implements Runnable { } } + /** + * publish event to load {@link TaskStatusListener#activeTasksMap} {@link TaskStatusListener#activeStageMap} + * and {@link TaskStatusListener#activeRequestMap} for all running request once during server startup. + * This is required as some tasks may have been in progress when server was last stopped + * @param stages list of stages + */ + private void publishInProgressTasks(List<Stage> stages) { + if (taskStatusLoaded.compareAndSet(false, true)) { + if (!stages.isEmpty()) { + Function<Stage, Long> transform = new Function<Stage, Long>() { + @Override + public Long apply(Stage stage) { + return stage.getRequestId(); + } + }; + Set<Long> runningRequestID = ImmutableSet.copyOf(Lists.transform(stages, transform)); + List<HostRoleCommand> hostRoleCommands = db.getAllTasksByRequestIds(runningRequestID); + hostRoleCommandDAO.publishTaskCreateEvent(hostRoleCommands); + } + } + } /** * Returns the list of hosts that have a task assigned http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java index 31e11c1..502c016 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java @@ -71,7 +71,8 @@ public class Request { * As of now, this field is not used. Request status is * calculated at RequestResourceProvider on the fly. */ - private HostRoleStatus status; // not persisted yet + private HostRoleStatus status = HostRoleStatus.PENDING; + private HostRoleStatus displayStatus = HostRoleStatus.PENDING; private String inputs; private List<RequestResourceFilter> resourceFilters; private RequestOperationLevel operationLevel; @@ -186,6 +187,7 @@ public class Request { this.requestType = entity.getRequestType(); this.commandName = entity.getCommandName(); this.status = entity.getStatus(); + this.displayStatus = entity.getDisplayStatus(); if (entity.getRequestScheduleEntity() != null) { this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId(); } @@ -241,6 +243,8 @@ public class Request { requestEntity.setInputs(inputs); requestEntity.setRequestType(requestType); requestEntity.setRequestScheduleId(requestScheduleId); + requestEntity.setStatus(status); + requestEntity.setDisplayStatus(displayStatus); //TODO set all fields if (resourceFilters != null) { @@ -381,6 +385,8 @@ public class Request { ", startTime=" + startTime + ", endTime=" + endTime + ", inputs='" + inputs + '\'' + + ", status='" + status + '\'' + + ", displayStatus='" + displayStatus + '\'' + ", resourceFilters='" + resourceFilters + '\'' + ", operationLevel='" + operationLevel + '\'' + ", requestType=" + requestType + http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index 4a05b32..f7ceca2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -74,6 +74,8 @@ public class Stage { private long stageId = -1; private final String logDir; private final String requestContext; + private HostRoleStatus status = HostRoleStatus.PENDING; + private HostRoleStatus displayStatus = HostRoleStatus.PENDING; private String clusterHostInfo; private String commandParamsStage; private String hostParamsStage; @@ -157,6 +159,8 @@ public class Stage { commandParamsStage = stageEntity.getCommandParamsStage(); hostParamsStage = stageEntity.getHostParamsStage(); commandExecutionType = stageEntity.getCommandExecutionType(); + status = stageEntity.getStatus(); + displayStatus = stageEntity.getDisplayStatus(); List<Long> taskIds = hostRoleCommandDAO.findTaskIdsByStage(requestId, stageId); Collection<HostRoleCommand> commands = dbAccessor.getTasks(taskIds); @@ -197,6 +201,8 @@ public class Stage { stageEntity.setCommandParamsStage(commandParamsStage); stageEntity.setHostParamsStage(hostParamsStage); stageEntity.setCommandExecutionType(commandExecutionType); + stageEntity.setStatus(status); + stageEntity.setDisplayStatus(displayStatus); for (Role role : successFactors.keySet()) { RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity(); @@ -290,6 +296,23 @@ public class Stage { this.commandExecutionType = commandExecutionType; } + /** + * get current status of the stage + * @return {@link HostRoleStatus} + */ + public HostRoleStatus getStatus() { + return status; + } + + /** + * sets status of the stage + * @param status {@link HostRoleStatus} + */ + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public synchronized void setStageId(long stageId) { if (this.stageId != -1) { throw new RuntimeException("Attempt to set stageId again! Not allowed."); @@ -915,6 +938,8 @@ public class Stage { builder.append("clusterHostInfo="+clusterHostInfo+"\n"); builder.append("commandParamsStage="+commandParamsStage+"\n"); builder.append("hostParamsStage="+hostParamsStage+"\n"); + builder.append("status="+status+"\n"); + builder.append("displayStatus="+displayStatus+"\n"); builder.append("Success Factors:\n"); for (Role r : successFactors.keySet()) { builder.append(" role: "+r+", factor: "+successFactors.get(r)+"\n"); http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java index 3c415df..32dd03d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java @@ -26,12 +26,20 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.ambari.server.Role; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.actionmanager.Request; import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.orm.entities.StageEntityPK; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; /** * Status of a request resource, calculated from a set of tasks or stages. @@ -142,7 +150,7 @@ public class CalculatedStatus { Map<HostRoleStatus, Integer> taskStatusCounts = CalculatedStatus.calculateTaskEntityStatusCounts(tasks); - HostRoleStatus status = calculateSummaryStatusOfStage(taskStatusCounts, size, skippable); + HostRoleStatus status = calculateSummaryStatus(taskStatusCounts, size, skippable); double progressPercent = calculateProgressPercent(taskStatusCounts, size); @@ -167,7 +175,7 @@ public class CalculatedStatus { // calculate the stage status from the task status counts HostRoleStatus stageStatus = - calculateSummaryStatusOfStage(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable()); + calculateSummaryStatus(calculateTaskEntityStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable()); stageStatuses.add(stageStatus); @@ -203,7 +211,7 @@ public class CalculatedStatus { // calculate the stage status from the task status counts HostRoleStatus stageStatus = - calculateSummaryStatusOfStage(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable()); + calculateSummaryStatus(calculateTaskStatusCounts(stageTasks), stageTasks.size(), stage.isSkippable()); stageStatuses.add(stageStatus); @@ -256,6 +264,126 @@ public class CalculatedStatus { } /** + * Returns counts of tasks that are in various states. + * + * @param hostRoleCommands collection of beans {@link HostRoleCommand} + * + * @return a map of counts of tasks keyed by the task status + */ + public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands) { + Map<HostRoleStatus, Integer> counters = new HashMap<>(); + // initialize + for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) { + counters.put(hostRoleStatus, 0); + } + // calculate counts + for (HostRoleCommand hrc : hostRoleCommands) { + // count tasks where isCompletedState() == true as COMPLETED + // but don't count tasks with COMPLETED status twice + if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) { + // Increase total number of completed tasks; + counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1); + } + // Increment counter for particular status + counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1); + } + + // We overwrite the value to have the sum converged + counters.put(HostRoleStatus.IN_PROGRESS, + hostRoleCommands.size() - + counters.get(HostRoleStatus.COMPLETED) - + counters.get(HostRoleStatus.QUEUED) - + counters.get(HostRoleStatus.PENDING)); + + return counters; + } + + /** + * Returns map for counts of stages that are in various states. + * + * @param stages collection of beans {@link org.apache.ambari.server.events.listeners.tasks.TaskStatusListener.ActiveStage} + * + * @return a map of counts of tasks keyed by the task status + */ + public static Map<StatusType,Map<HostRoleStatus, Integer>> calculateStatusCountsForStage(Collection<TaskStatusListener.ActiveStage> stages) { + + Map<StatusType,Map<HostRoleStatus, Integer>> counters = new HashMap<>(); + for (StatusType statusType : StatusType.values()) { + Map <HostRoleStatus, Integer> statusMap = new HashMap<HostRoleStatus, Integer>(); + counters.put(statusType,statusMap); + // initialize + for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) { + statusMap.put(hostRoleStatus, 0); + } + for (TaskStatusListener.ActiveStage stage : stages) { + // count tasks where isCompletedState() == true as COMPLETED + // but don't count tasks with COMPLETED status twice + HostRoleStatus status; + if (statusType == StatusType.DISPLAY_STATUS) { + status = stage.getDisplayStatus(); + } else { + status = stage.getStatus(); + } + if (status.isCompletedState() && status != HostRoleStatus.COMPLETED) { + // Increase total number of completed tasks; + statusMap.put(HostRoleStatus.COMPLETED, statusMap.get(HostRoleStatus.COMPLETED) + 1); + } + + // Increment counter for particular status + statusMap.put(status, statusMap.get(status) + 1); + } + statusMap.put(HostRoleStatus.IN_PROGRESS, + stages.size() - + statusMap.get(HostRoleStatus.COMPLETED) - + statusMap.get(HostRoleStatus.QUEUED) - + statusMap.get(HostRoleStatus.PENDING)); + } + return counters; + } + + + /** + * Returns counts of tasks that are in various states. + * + * @param hostRoleCommands collection of beans {@link HostRoleCommand} + * + * @return a map of counts of tasks keyed by the task status + */ + public static Map<HostRoleStatus, Integer> calculateStatusCountsForTasks(Collection<HostRoleCommand> hostRoleCommands, StageEntityPK stage) { + Map<HostRoleStatus, Integer> counters = new HashMap<>(); + List<HostRoleCommand> hostRoleCommandsOfStage = new ArrayList<>(); + // initialize + for (HostRoleStatus hostRoleStatus : HostRoleStatus.values()) { + counters.put(hostRoleStatus, 0); + } + // calculate counts + for (HostRoleCommand hrc : hostRoleCommands) { + if (stage.getStageId() == hrc.getStageId() && stage.getRequestId() == hrc.getRequestId()) { + // count tasks where isCompletedState() == true as COMPLETED + // but don't count tasks with COMPLETED status twice + if (hrc.getStatus().isCompletedState() && hrc.getStatus() != HostRoleStatus.COMPLETED) { + // Increase total number of completed tasks; + counters.put(HostRoleStatus.COMPLETED, counters.get(HostRoleStatus.COMPLETED) + 1); + } + + // Increment counter for particular status + counters.put(hrc.getStatus(), counters.get(hrc.getStatus()) + 1); + + hostRoleCommandsOfStage.add(hrc); + } + } + + // We overwrite the value to have the sum converged + counters.put(HostRoleStatus.IN_PROGRESS, + hostRoleCommandsOfStage.size() - + counters.get(HostRoleStatus.COMPLETED) - + counters.get(HostRoleStatus.QUEUED) - + counters.get(HostRoleStatus.PENDING)); + + return counters; + } + + /** * Returns counts of task entities that are in various states. * * @param tasks the collection of task entities @@ -329,7 +457,7 @@ public class CalculatedStatus { int total = summary.getTaskTotal(); boolean skip = summary.isStageSkippable(); Map<HostRoleStatus, Integer> counts = calculateStatusCounts(summary.getTaskStatuses()); - HostRoleStatus stageStatus = calculateSummaryStatusOfStage(counts, total, skip); + HostRoleStatus stageStatus = calculateSummaryStatus(counts, total, skip); HostRoleStatus stageDisplayStatus = calculateSummaryDisplayStatus(counts, total, skip); stageStatuses.add(stageStatus); @@ -392,7 +520,7 @@ public class CalculatedStatus { * * @return summary request status based on statuses of tasks in different states. */ - public static HostRoleStatus calculateSummaryStatusOfStage(Map<HostRoleStatus, Integer> counters, + public static HostRoleStatus calculateSummaryStatus(Map<HostRoleStatus, Integer> counters, int total, boolean skippable) { // when there are 0 tasks, return COMPLETED @@ -435,6 +563,230 @@ public class CalculatedStatus { } /** + * + * @param counters counts of resources that are in various states + * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage + * @return {@link HostRoleStatus} + */ + public static HostRoleStatus calculateSummaryStatusFromPartialSet(Map<HostRoleStatus, Integer> counters, + boolean skippable) { + + HostRoleStatus status = HostRoleStatus.PENDING; + // By definition, any tasks in a future stage must be held in a PENDING status. + if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) { + status = counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING : + counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED : + HostRoleStatus.HOLDING_TIMEDOUT; + } + + // Because tasks are not skippable, guaranteed to be FAILED + if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) { + status = HostRoleStatus.FAILED; + } + + // Because tasks are not skippable, guaranteed to be TIMEDOUT + if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) { + status = HostRoleStatus.TIMEDOUT; + } + + int inProgressTasks = counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS); + if (inProgressTasks > 0) { + status = HostRoleStatus.IN_PROGRESS; + } + + return status; + } + + + /** + * + * @param hostRoleCommands list of {@link HostRoleCommand} for a stage + * @param counters counts of resources that are in various states + * @param successFactors Map of roles to their successfactor for a stage + * @param skippable {Boolean} <code>TRUE<code/> if failure of any of the task should not fail the stage + * @return {@link HostRoleStatus} based on success factor + */ + public static HostRoleStatus calculateStageStatus(List <HostRoleCommand> hostRoleCommands, Map<HostRoleStatus, Integer> counters, Map<Role, Float> successFactors, + boolean skippable) { + + // when there are 0 tasks, return COMPLETED + int total = hostRoleCommands.size(); + if (total == 0) { + return HostRoleStatus.COMPLETED; + } + + if (counters.get(HostRoleStatus.PENDING) == total) { + return HostRoleStatus.PENDING; + } + + // By definition, any tasks in a future stage must be held in a PENDING status. + if (counters.get(HostRoleStatus.HOLDING) > 0 || counters.get(HostRoleStatus.HOLDING_FAILED) > 0 || counters.get(HostRoleStatus.HOLDING_TIMEDOUT) > 0) { + return counters.get(HostRoleStatus.HOLDING) > 0 ? HostRoleStatus.HOLDING : + counters.get(HostRoleStatus.HOLDING_FAILED) > 0 ? HostRoleStatus.HOLDING_FAILED : + HostRoleStatus.HOLDING_TIMEDOUT; + } + + + if (counters.get(HostRoleStatus.FAILED) > 0 && !skippable) { + Set<Role> rolesWithFailedTasks = getRolesOfFailedTasks(hostRoleCommands); + Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithFailedTasks, successFactors); + if (didStageFailed) return HostRoleStatus.FAILED; + } + + + if (counters.get(HostRoleStatus.TIMEDOUT) > 0 && !skippable) { + Set<Role> rolesWithTimedOutTasks = getRolesOfTimedOutTasks(hostRoleCommands); + Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors); + if (didStageFailed) return HostRoleStatus.TIMEDOUT; + } + + int numActiveTasks = counters.get(HostRoleStatus.PENDING) + counters.get(HostRoleStatus.QUEUED) + counters.get(HostRoleStatus.IN_PROGRESS); + + if (numActiveTasks > 0) { + return HostRoleStatus.IN_PROGRESS; + } else if (counters.get(HostRoleStatus.ABORTED) > 0) { + Set<Role> rolesWithTimedOutTasks = getRolesOfAbortedTasks(hostRoleCommands); + Boolean didStageFailed = didStageFailed(hostRoleCommands, rolesWithTimedOutTasks, successFactors); + if (didStageFailed) return HostRoleStatus.ABORTED; + } + + return HostRoleStatus.COMPLETED; + } + + /** + * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#FAILED} + * @param hostRoleCommands list of {@link HostRoleCommand} + * @return Set of {@link Role} + */ + protected static Set<Role> getRolesOfFailedTasks(List <HostRoleCommand> hostRoleCommands) { + return getRolesOfTasks(hostRoleCommands, HostRoleStatus.FAILED); + } + + /** + * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#TIMEDOUT} + * @param hostRoleCommands list of {@link HostRoleCommand} + * @return Set of {@link Role} + */ + protected static Set<Role> getRolesOfTimedOutTasks(List <HostRoleCommand> hostRoleCommands) { + return getRolesOfTasks(hostRoleCommands, HostRoleStatus.TIMEDOUT); + } + + /** + * Get all {@link Role} any of whose tasks is in {@link HostRoleStatus#ABORTED} + * @param hostRoleCommands list of {@link HostRoleCommand} + * @return Set of {@link Role} + */ + protected static Set<Role> getRolesOfAbortedTasks(List <HostRoleCommand> hostRoleCommands) { + return getRolesOfTasks(hostRoleCommands, HostRoleStatus.ABORTED); + } + + /** + * Get all {@link Role} any of whose tasks are in given {@code status} + * @param hostRoleCommands list of {@link HostRoleCommand} + * @param status {@link HostRoleStatus} + * @return Set of {@link Role} + */ + protected static Set<Role> getRolesOfTasks(List <HostRoleCommand> hostRoleCommands, final HostRoleStatus status) { + + Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() { + @Override + public boolean apply(HostRoleCommand hrc) { + return hrc.getStatus() == status; + } + }; + + Function<HostRoleCommand, Role> transform = new Function<HostRoleCommand, Role>() { + @Override + public Role apply(HostRoleCommand hrc) { + return hrc.getRole(); + } + }; + return FluentIterable.from(hostRoleCommands) + .filter(predicate) + .transform(transform) + .toSet(); + } + + /** + * + * @param hostRoleCommands list of {@link HostRoleCommand} for a stage + * @param roles set of roles to be checked for meeting success criteria + * @param successFactors map of role to it's success factor + * @return {Boolean} <code>TRUE</code> if stage failed due to hostRoleCommands of any role not meeting success criteria + */ + protected static Boolean didStageFailed(List<HostRoleCommand> hostRoleCommands, Set<Role> roles, Map<Role, Float> successFactors) { + Boolean isFailed = Boolean.FALSE; + for (Role role: roles) { + List <HostRoleCommand> hostRoleCommandsOfRole = getHostRoleCommandsOfRole(hostRoleCommands, role); + List <HostRoleCommand> failedHostRoleCommands = getFailedHostRoleCommands(hostRoleCommandsOfRole); + float successRatioForRole = (hostRoleCommandsOfRole.size() - failedHostRoleCommands.size())/hostRoleCommandsOfRole.size(); + Float successFactorForRole = successFactors.get(role) == null ? 1.0f : successFactors.get(role); + if (successRatioForRole < successFactorForRole) { + isFailed = Boolean.TRUE; + break; + } + } + return isFailed; + } + + /** + * + * @param hostRoleCommands list of {@link HostRoleCommand} + * @param role {@link Role} + * @return list of {@link HostRoleCommand} that belongs to {@link Role} + */ + protected static List<HostRoleCommand> getHostRoleCommandsOfRole(List <HostRoleCommand> hostRoleCommands, final Role role) { + Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() { + @Override + public boolean apply(HostRoleCommand hrc) { + return hrc.getRole() == role; + } + }; + return FluentIterable.from(hostRoleCommands) + .filter(predicate) + .toList(); + } + + /** + * + * @param hostRoleCommands list of {@link HostRoleCommand} + * @return list of {@link HostRoleCommand} with failed status + */ + protected static List<HostRoleCommand> getFailedHostRoleCommands(List <HostRoleCommand> hostRoleCommands) { + Predicate<HostRoleCommand> predicate = new Predicate<HostRoleCommand>() { + @Override + public boolean apply(HostRoleCommand hrc) { + return hrc.getStatus().isFailedAndNotSkippableState(); + } + }; + return FluentIterable.from(hostRoleCommands) + .filter(predicate) + .toList(); + } + + + /** + * Calculate overall status from collection of statuses + * @param hostRoleStatuses list of all stage's {@link HostRoleStatus} + * @return overall status of a request + */ + public static HostRoleStatus getOverallStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) { + Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses); + return calculateSummaryStatus(statusCount, hostRoleStatuses.size(), false); + } + + /** + * Calculate overall display status from collection of statuses + * @param hostRoleStatuses list of all stage's {@link HostRoleStatus} + * @return overall display status of a request + */ + public static HostRoleStatus getOverallDisplayStatusForRequest (Collection<HostRoleStatus> hostRoleStatuses) { + Map<HostRoleStatus, Integer> statusCount = calculateStatusCounts(hostRoleStatuses); + return calculateSummaryDisplayStatus(statusCount, hostRoleStatuses.size(), false); + } + + + /** * Calculate overall status of an upgrade. * * @param counters counts of resources that are in various states @@ -444,7 +796,7 @@ public class CalculatedStatus { */ protected static HostRoleStatus calculateSummaryStatusOfUpgrade( Map<HostRoleStatus, Integer> counters, int total) { - return calculateSummaryStatusOfStage(counters, total, false); + return calculateSummaryStatus(counters, total, false); } /** @@ -456,10 +808,28 @@ public class CalculatedStatus { * * @return summary request status based on statuses of tasks in different states. */ - protected static HostRoleStatus calculateSummaryDisplayStatus( + public static HostRoleStatus calculateSummaryDisplayStatus( Map<HostRoleStatus, Integer> counters, int total, boolean skippable) { - return counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED : - counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED: - calculateSummaryStatusOfStage(counters, total, skippable); + return counters.get(HostRoleStatus.FAILED) > 0 ? HostRoleStatus.FAILED: + counters.get(HostRoleStatus.TIMEDOUT) > 0 ? HostRoleStatus.TIMEDOUT: + counters.get(HostRoleStatus.SKIPPED_FAILED) > 0 ? HostRoleStatus.SKIPPED_FAILED : + calculateSummaryStatus(counters, total, skippable); + } + + /** + * kind of {@link HostRoleStatus} persisted by {@link Stage} and {@link Request} + */ + public enum StatusType { + STATUS("status"), + DISPLAY_STATUS("display_status"); + private String value; + + StatusType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java new file mode 100644 index 0000000..9d73122 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskCreateEvent.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + + +import java.util.List; + +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener; + +/** + * The {@link TaskCreateEvent} is to be fired every time + * when any request is to be tracked as running requests in + * {@link TaskStatusListener} + * This usually happens when new request is created by user action or + * when ambari-server starts with some stages in non-completed state + */ +public class TaskCreateEvent extends TaskEvent { + + + /** + * Constructor. + * + * @param hostRoleCommandList + * all hostRoleCommands for all requests + */ + public TaskCreateEvent(List<HostRoleCommand> hostRoleCommandList) { + super(hostRoleCommandList); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java new file mode 100644 index 0000000..ca351d7 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskEvent.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +import java.util.List; + +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.commons.lang.StringUtils; + +/** + * {@link TaskEvent} is the base for all events related to create/update of tasks + * that might result in update of stage/request status + */ +public class TaskEvent { + /** + * List of {@link HostRoleCommand} + */ + private List<HostRoleCommand> hostRoleCommands; + + /** + * Constructor. + * + * @param hostRoleCommands + * list of HRCs which have been reported back by the agents. + */ + public TaskEvent(List<HostRoleCommand> hostRoleCommands) { + this.hostRoleCommands = hostRoleCommands; + } + + /** + * Gets hostRoleCommands that created event + * @return List of {@link HostRoleCommand} + */ + public List<HostRoleCommand> getHostRoleCommands() { + return hostRoleCommands; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + String hostRoleCommands = StringUtils.join(this.hostRoleCommands, ", "); + StringBuilder buffer = new StringBuilder("TaskEvent{"); + buffer.append("hostRoleCommands=").append(hostRoleCommands); + buffer.append("}"); + return buffer.toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java new file mode 100644 index 0000000..84f67f5 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/TaskUpdateEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +import java.util.List; + +import org.apache.ambari.server.actionmanager.HostRoleCommand; + +/** + * The {@link TaskUpdateEvent} is to be fired every time + * when host role commands are merged to the database + */ +public class TaskUpdateEvent extends TaskEvent{ + + public TaskUpdateEvent(List<HostRoleCommand> hostRoleCommandList) { + super(hostRoleCommandList); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java new file mode 100644 index 0000000..bc146ef --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/tasks/TaskStatusListener.java @@ -0,0 +1,609 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.listeners.tasks; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.server.EagerSingleton; +import org.apache.ambari.server.Role; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.actionmanager.Request; +import org.apache.ambari.server.actionmanager.Stage; +import org.apache.ambari.server.controller.internal.CalculatedStatus; +import org.apache.ambari.server.events.TaskCreateEvent; +import org.apache.ambari.server.events.TaskUpdateEvent; +import org.apache.ambari.server.events.publishers.TaskEventPublisher; +import org.apache.ambari.server.orm.dao.RequestDAO; +import org.apache.ambari.server.orm.dao.StageDAO; +import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity; +import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.orm.entities.StageEntityPK; +import org.jboss.netty.util.internal.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.Sets; +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +/** + * The {@link TaskStatusListener} is used to constantly update status of running Stages and Requests + * {@link TaskUpdateEvent} listens for all incoming events. These events are fired when either host role commands are created/updated + * This listener maintains map of all running tasks, stages and requests + */ +@Singleton +@EagerSingleton +public class TaskStatusListener { + /** + * Logger. + */ + private final static Logger LOG = LoggerFactory.getLogger(TaskStatusListener.class); + + /** + * Maps task id to its {@link HostRoleCommand} Object. + * Map has entries of all tasks of all active(ongoing) requests + * NOTE: Partial loading of tasks for any request may lead to incorrect update of the request status + */ + private Map<Long,HostRoleCommand> activeTasksMap = new ConcurrentHashMap<>(); + + /** + * Maps all ongoing request id to its {@link ActiveRequest} + */ + private Map<Long, ActiveRequest> activeRequestMap = new ConcurrentHashMap<>(); + + /** + * Maps {@link StageEntityPK} of all ongoing requests to its {@link ActiveStage} + * with updated {@link ActiveStage#status} and {@link ActiveStage#displayStatus}. + */ + private Map<StageEntityPK, ActiveStage> activeStageMap = new ConcurrentHashMap<>(); + + private StageDAO stageDAO; + + private RequestDAO requestDAO; + + + @Inject + public TaskStatusListener(TaskEventPublisher taskEventPublisher, StageDAO stageDAO, RequestDAO requestDAO) { + this.stageDAO = stageDAO; + this.requestDAO = requestDAO; + taskEventPublisher.register(this); + } + + public Map<Long,HostRoleCommand> getActiveTasksMap() { + return activeTasksMap; + } + + public Map<Long, ActiveRequest> getActiveRequestMap() { + return activeRequestMap; + } + + public Map<StageEntityPK, ActiveStage> getActiveStageMap() { + return activeStageMap; + } + + /** + * On receiving task update event, update related entries of the running request, stage and task in the maps + * Event containing newly created tasks is expected to contain complete set of all tasks for a request + * @param event Consumes {@link TaskUpdateEvent}. + */ + @Subscribe + public void onTaskUpdateEvent(TaskUpdateEvent event) { + LOG.debug("Received task update event {}", event); + List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands(); + List<HostRoleCommand> hostRoleCommandWithReceivedStatus = new ArrayList<>(); + Set<StageEntityPK> stagesWithReceivedTaskStatus = new HashSet<>(); + Set<Long> requestIdsWithReceivedTaskStatus = new HashSet<>(); + for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) { + Long reportedTaskId = hostRoleCommand.getTaskId(); + HostRoleCommand activeTask = activeTasksMap.get(reportedTaskId); + if (activeTask == null) { + LOG.error(String.format("Received update for a task %d which is not being tracked as running task", reportedTaskId)); + } else { + hostRoleCommandWithReceivedStatus.add(hostRoleCommand); + StageEntityPK stageEntityPK = new StageEntityPK(); + stageEntityPK.setRequestId(hostRoleCommand.getRequestId()); + stageEntityPK.setStageId(hostRoleCommand.getStageId()); + stagesWithReceivedTaskStatus.add(stageEntityPK); + requestIdsWithReceivedTaskStatus.add(hostRoleCommand.getRequestId()); + } + } + + updateActiveTasksMap(hostRoleCommandWithReceivedStatus); + Boolean didAnyStageStatusUpdated = updateActiveStagesStatus(stagesWithReceivedTaskStatus, hostRoleCommandListAll); + // Presumption: If there is no update in any of the running stage's status + // then none of the running request status needs to be updated + if (didAnyStageStatusUpdated) { + updateActiveRequestsStatus(requestIdsWithReceivedTaskStatus, stagesWithReceivedTaskStatus); + } + + } + + /** + * On receiving task create event, create entries in the running request, stage and task in the maps + * @param event Consumes {@link TaskCreateEvent}. + */ + @Subscribe + public void onTaskCreateEvent(TaskCreateEvent event) { + LOG.debug("Received task create event {}", event); + List<HostRoleCommand> hostRoleCommandListAll = event.getHostRoleCommands(); + + for (HostRoleCommand hostRoleCommand : hostRoleCommandListAll) { + activeTasksMap.put(hostRoleCommand.getTaskId(), hostRoleCommand); + addStagePK(hostRoleCommand); + addRequestId(hostRoleCommand); + } + } + + + /** + * update changed host role command status + * @param hostRoleCommandWithReceivedStatus list of host role commands reported + */ + private void updateActiveTasksMap(List<HostRoleCommand> hostRoleCommandWithReceivedStatus) { + for (HostRoleCommand hostRoleCommand : hostRoleCommandWithReceivedStatus) { + Long taskId = hostRoleCommand.getTaskId(); + activeTasksMap.put(taskId , hostRoleCommand); + } + } + + + /** + * Adds new {@link StageEntityPK} to be tracked as running stage in {@link #activeStageMap} + * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap} + */ + private void addStagePK(HostRoleCommand hostRoleCommand) { + StageEntityPK stageEntityPK = new StageEntityPK(); + stageEntityPK.setRequestId(hostRoleCommand.getRequestId()); + stageEntityPK.setStageId(hostRoleCommand.getStageId()); + if (activeStageMap.containsKey(stageEntityPK)) { + activeStageMap.get(stageEntityPK).addTaskId(hostRoleCommand.getTaskId()); + } else { + StageEntity stageEntity = stageDAO.findByPK(stageEntityPK); + // Stage entity of the hostrolecommand should be persisted before publishing task create event + assert stageEntity != null; + Map<Role, Float> successFactors = new HashMap<>(); + Collection<RoleSuccessCriteriaEntity> roleSuccessCriteriaEntities = stageEntity.getRoleSuccessCriterias(); + for (RoleSuccessCriteriaEntity successCriteriaEntity : roleSuccessCriteriaEntities) { + successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue()); + } + Set<Long> taskIdSet = Sets.newHashSet(hostRoleCommand.getTaskId()); + + ActiveStage reportedStage = new ActiveStage(stageEntity.getStatus(), stageEntity.getDisplayStatus(), + successFactors, stageEntity.isSkippable(), taskIdSet); + activeStageMap.put(stageEntityPK, reportedStage); + } + } + + /** + * update and persist all changed stage status + * @param stagesWithReceivedTaskStatus set of stages that has received task status + * @param hostRoleCommandListAll list of all task updates received from agent + * @return <code>true</code> if any of the stage has changed it's existing status; + * <code>false</code> otherwise + */ + private Boolean updateActiveStagesStatus(final Set<StageEntityPK> stagesWithReceivedTaskStatus, List<HostRoleCommand> hostRoleCommandListAll) { + Boolean didAnyStageStatusUpdated = Boolean.FALSE; + for (StageEntityPK reportedStagePK : stagesWithReceivedTaskStatus) { + if (activeStageMap.containsKey(reportedStagePK)) { + Boolean didStatusChange = updateStageStatus(reportedStagePK, hostRoleCommandListAll); + if (didStatusChange) { + ActiveStage reportedStage = activeStageMap.get(reportedStagePK); + stageDAO.updateStatus(reportedStagePK, reportedStage.getStatus(), reportedStage.getDisplayStatus()); + didAnyStageStatusUpdated = Boolean.TRUE; + } + } else { + LOG.error(String.format("Received update for a task whose stage is not being tracked as running stage: %s", reportedStagePK.toString())); + } + + } + return didAnyStageStatusUpdated; + } + + /** + * Adds new request id to be tracked as running request in {@link #activeRequestMap} + * @param hostRoleCommand newly created {@link HostRoleCommand} in {@link #activeTasksMap} + */ + private void addRequestId(HostRoleCommand hostRoleCommand) { + Long requestId = hostRoleCommand.getRequestId(); + StageEntityPK stageEntityPK = new StageEntityPK(); + stageEntityPK.setRequestId(hostRoleCommand.getRequestId()); + stageEntityPK.setStageId(hostRoleCommand.getStageId()); + if (activeRequestMap.containsKey(requestId)) { + activeRequestMap.get(requestId).addStageEntityPK(stageEntityPK); + } else { + RequestEntity requestEntity = requestDAO.findByPK(requestId); + // Request entity of the hostrolecommand should be persisted before publishing task create event + assert requestEntity != null; + Set<StageEntityPK> stageEntityPKs = Sets.newHashSet(stageEntityPK); + ActiveRequest request = new ActiveRequest(requestEntity.getStatus(),requestEntity.getDisplayStatus(), stageEntityPKs); + activeRequestMap.put(requestId, request); + } + } + + + /** + * update and persist changed request status + * @param requestIdsWithReceivedTaskStatus set of request ids that has received tasks status + * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status + */ + private void updateActiveRequestsStatus(final Set<Long> requestIdsWithReceivedTaskStatus, Set<StageEntityPK> stagesWithChangedTaskStatus) { + for (Long reportedRequestId : requestIdsWithReceivedTaskStatus) { + if (activeRequestMap.containsKey(reportedRequestId)) { + ActiveRequest request = activeRequestMap.get(reportedRequestId); + Boolean didStatusChange = updateRequestStatus(reportedRequestId, stagesWithChangedTaskStatus); + if (didStatusChange) { + requestDAO.updateStatus(reportedRequestId, request.getStatus(), request.getDisplayStatus()); + } + if (request.isCompleted() && isAllTasksCompleted(reportedRequestId)) { + // Request is considered ton have been finished if request status and all of it's tasks status are completed + // in that case, request and it's stages + // and tasks should no longer be tracked as active(running) + removeRequestStageAndTasks(reportedRequestId); + } + } else { + LOG.error(String.format("Received update for a task whose request %d is not being tracked as running request", reportedRequestId)); + } + + } + } + + /** + * + * @param requestId request Id + * @return <code>false</code> if any of the task belonging to requestId has incomplete status + * <code>true</code> otherwise + */ + private Boolean isAllTasksCompleted(Long requestId) { + Boolean result = Boolean.TRUE; + for (Map.Entry<Long, HostRoleCommand> entry : activeTasksMap.entrySet()) { + if (entry.getValue().getRequestId() == requestId && !entry.getValue().getStatus().isCompletedState()) { + result = Boolean.FALSE; + } + } + return result; + } + + /** + * Removes entries from {@link #activeTasksMap},{@link #activeStageMap} and {@link #activeRequestMap} + * @param requestId request id whose entry and it's stage and task entries is to be removed + */ + private void removeRequestStageAndTasks(Long requestId) { + removeTasks(requestId); + removeStages(requestId); + removeRequest(requestId); + } + + + /** + * Filters list of {@link Stage} to list of {@link StageEntityPK} + * @param requestID requestId + * @return list of StageEntityPK + */ + private List<StageEntityPK> getAllStageEntityPKForRequest(final Long requestID) { + Predicate<StageEntityPK> predicate = new Predicate<StageEntityPK>() { + @Override + public boolean apply(StageEntityPK stageEntityPK) { + return stageEntityPK.getRequestId().equals(requestID); + } + }; + return FluentIterable.from(activeStageMap.keySet()) + .filter(predicate) + .toList(); + } + + + + /** + * Returns the computed status of the stage from the status of it's host role commands + * @param stagePK {@link StageEntityPK} primary key for the stage entity + * @param hostRoleCommandListAll list of all hrc received whose status has been received from agent + * @return {@link Boolean} <code>TRUE</code> if status of the given stage changed. + */ + private Boolean updateStageStatus(final StageEntityPK stagePK, List<HostRoleCommand> hostRoleCommandListAll) { + Boolean didAnyStatusChanged = Boolean.FALSE; + ActiveStage reportedStage = activeStageMap.get(stagePK); + HostRoleStatus stageCurrentStatus = reportedStage.getStatus(); + HostRoleStatus stageCurrentDisplayStatus = reportedStage.getDisplayStatus(); + + + // if stage is already marked to be completed then do not calculate reported status from host role commands + // Presumption: There will be no status transition of the host role command from one completed state to another + if (!stageCurrentDisplayStatus.isCompletedState() || !stageCurrentStatus.isCompletedState()) { + Map<HostRoleStatus, Integer> receivedTaskStatusCount = CalculatedStatus.calculateStatusCountsForTasks(hostRoleCommandListAll, stagePK); + HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, reportedStage.getSkippable()); + HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(receivedTaskStatusCount, Boolean.FALSE); + if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) { + Function<Long,HostRoleCommand> transform = new Function<Long,HostRoleCommand>(){ + @Override + public HostRoleCommand apply(Long taskId) { + return activeTasksMap.get(taskId); + } + }; + + List<HostRoleCommand> activeHostRoleCommandsOfStage = FluentIterable.from(reportedStage.getTaskIds()) + .transform(transform).toList(); + Map<HostRoleStatus, Integer> statusCount = CalculatedStatus.calculateStatusCountsForTasks(activeHostRoleCommandsOfStage); + if (displayStatusFromPartialSet == HostRoleStatus.PENDING) { + // calculate and get new display status of the stage as per the new status of received host role commands + HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(statusCount, activeHostRoleCommandsOfStage.size(), reportedStage.getSkippable()); + if (display_status != stageCurrentDisplayStatus) { + reportedStage.setDisplayStatus(display_status); + didAnyStatusChanged = Boolean.TRUE; + } + + } else { + reportedStage.setDisplayStatus(displayStatusFromPartialSet); + didAnyStatusChanged = Boolean.TRUE; + } + + if (statusFromPartialSet == HostRoleStatus.PENDING) { + // calculate status of the stage as per the new status of received host role commands + HostRoleStatus status = CalculatedStatus.calculateStageStatus(activeHostRoleCommandsOfStage, statusCount, reportedStage.getSuccessFactors(), reportedStage.getSkippable()); + if (status != stageCurrentStatus) { + reportedStage.setStatus(status); + didAnyStatusChanged = Boolean.TRUE; + } + } else { + reportedStage.setDisplayStatus(displayStatusFromPartialSet); + didAnyStatusChanged = Boolean.TRUE; + } + } else { + reportedStage.setStatus(statusFromPartialSet); + reportedStage.setDisplayStatus(displayStatusFromPartialSet); + didAnyStatusChanged = Boolean.TRUE; + } + } + + return didAnyStatusChanged; + } + + /** + * + * @param requestId {@link Request} whose status is to be updated + * @param stagesWithChangedTaskStatus set of stages that have received tasks with changed status + * @return {Boolean} <code>TRUE</code> if request status has changed from existing + */ + private Boolean updateRequestStatus (final Long requestId, Set<StageEntityPK> stagesWithChangedTaskStatus) { + Boolean didStatusChanged = Boolean.FALSE; + ActiveRequest request = activeRequestMap.get(requestId); + HostRoleStatus requestCurrentStatus = request.getStatus(); + HostRoleStatus requestCurrentDisplayStatus = request.getDisplayStatus(); + + if (!requestCurrentDisplayStatus.isCompletedState() || !requestCurrentStatus.isCompletedState()) { + List <ActiveStage> activeStagesWithChangesTaskStatus = new ArrayList<>(); + for (StageEntityPK stageEntityPK:stagesWithChangedTaskStatus) { + if (requestId.equals(stageEntityPK.getRequestId())) { + ActiveStage activeStage = activeStageMap.get(stageEntityPK); + activeStagesWithChangesTaskStatus.add(activeStage); + } + } + + + Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCountFromPartialSet = CalculatedStatus.calculateStatusCountsForStage(activeStagesWithChangesTaskStatus); + HostRoleStatus statusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.STATUS), Boolean.FALSE); + HostRoleStatus displayStatusFromPartialSet = CalculatedStatus.calculateSummaryStatusFromPartialSet(stageStatusCountFromPartialSet.get(CalculatedStatus.StatusType.DISPLAY_STATUS), Boolean.FALSE); + + if (statusFromPartialSet == HostRoleStatus.PENDING || displayStatusFromPartialSet == HostRoleStatus.PENDING) { + List <ActiveStage> allActiveStages = new ArrayList<>(); + for (StageEntityPK stageEntityPK:request.getStageEntityPks()) { + ActiveStage activeStage = activeStageMap.get(stageEntityPK); + allActiveStages.add(activeStage); + } + Map<CalculatedStatus.StatusType,Map<HostRoleStatus, Integer>> stageStatusCount = CalculatedStatus.calculateStatusCountsForStage(allActiveStages); + + if (displayStatusFromPartialSet == HostRoleStatus.PENDING) { + // calculate and get new display status of the stage as per the new status of received host role commands + + HostRoleStatus display_status = CalculatedStatus.calculateSummaryDisplayStatus(stageStatusCount.get(CalculatedStatus.StatusType.DISPLAY_STATUS), allActiveStages.size(), false); + if (display_status != requestCurrentDisplayStatus) { + request.setDisplayStatus(display_status); + didStatusChanged = Boolean.TRUE; + } + + } else { + request.setDisplayStatus(displayStatusFromPartialSet); + didStatusChanged = Boolean.TRUE; + } + + if (statusFromPartialSet == HostRoleStatus.PENDING) { + // calculate status of the stage as per the new status of received host role commands + HostRoleStatus status = CalculatedStatus.calculateSummaryStatus(stageStatusCount.get(CalculatedStatus.StatusType.STATUS), allActiveStages.size(), false); + if (status != requestCurrentStatus) { + request.setStatus(status); + didStatusChanged = Boolean.TRUE; + } + } else { + request.setDisplayStatus(displayStatusFromPartialSet); + didStatusChanged = Boolean.TRUE; + } + } else { + request.setStatus(statusFromPartialSet); + request.setDisplayStatus(displayStatusFromPartialSet); + didStatusChanged = Boolean.TRUE; + } + } + + return didStatusChanged; + } + + + /** + * Removes list of {@link HostRoleCommand} entries from {@link #activeTasksMap} + * @param requestId request id + */ + private void removeTasks(Long requestId) { + Iterator<Map.Entry<Long, HostRoleCommand>> iter = activeTasksMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<Long, HostRoleCommand> entry = iter.next(); + HostRoleCommand hrc = entry.getValue(); + if (hrc.getRequestId() == requestId) { + if (!hrc.getStatus().isCompletedState()) { + LOG.error(String.format("Task %d should have been completed before being removed from running task cache(activeTasksMap)", hrc.getTaskId())); + } + iter.remove(); + } + } + } + + + /** + * Removes list of {@link StageEntityPK} entries from {@link #activeStageMap} + * @param requestId request Id + */ + private void removeStages(Long requestId) { + List <StageEntityPK> stageEntityPKs = getAllStageEntityPKForRequest(requestId); + for (StageEntityPK stageEntityPK: stageEntityPKs) { + activeStageMap.remove(stageEntityPK); + } + } + + + /** + * Removes request id from {@link #activeRequestMap} + * @param requestId request Id + */ + private void removeRequest(Long requestId) { + activeRequestMap.remove(requestId); + } + + + /** + * This class stores {@link Request#status} and {@link Request#displayStatus} information + * This information is cached for all running {@link Request} at {@link #activeRequestMap} + */ + protected class ActiveRequest { + private HostRoleStatus status; + private HostRoleStatus displayStatus; + private Set <StageEntityPK> stageEntityPks; + + public ActiveRequest(HostRoleStatus status, HostRoleStatus displayStatus, Set<StageEntityPK> stageEntityPks) { + this.status = status; + this.displayStatus = displayStatus; + this.stageEntityPks = stageEntityPks; + } + + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public HostRoleStatus getDisplayStatus() { + return displayStatus; + } + + public void setDisplayStatus(HostRoleStatus displayStatus) { + this.displayStatus = displayStatus; + } + + public Boolean isCompleted() { + return status.isCompletedState() && displayStatus.isCompletedState(); + } + + public Set <StageEntityPK> getStageEntityPks() { + return stageEntityPks; + } + + public void addStageEntityPK(StageEntityPK stageEntityPK) { + stageEntityPks.add(stageEntityPK); + } + + } + + /** + * This class stores information needed to determine {@link Stage#status} and {@link Stage#displayStatus} + * This information is cached for all {@link Stage} of all running {@link Request} at {@link #activeStageMap} + */ + public class ActiveStage { + private HostRoleStatus status; + private HostRoleStatus displayStatus; + private Boolean skippable; + private Set <Long> taskIds; + + //Map of roles to successFactors for this stage. Default is 1 i.e. 100% + private Map<Role, Float> successFactors = new HashMap<Role, Float>(); + + public ActiveStage(HostRoleStatus status, HostRoleStatus displayStatus, + Map<Role, Float> successFactors, Boolean skippable, Set<Long> taskIds) { + this.status = status; + this.displayStatus = displayStatus; + this.successFactors = successFactors; + this.skippable = skippable; + this.taskIds = taskIds; + } + + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public HostRoleStatus getDisplayStatus() { + return displayStatus; + } + + public void setDisplayStatus(HostRoleStatus displayStatus) { + this.displayStatus = displayStatus; + } + + public Boolean getSkippable() { + return skippable; + } + + public void setSkippable(Boolean skippable) { + this.skippable = skippable; + } + + public Map<Role, Float> getSuccessFactors() { + return successFactors; + } + + public void setSuccessFactors(Map<Role, Float> successFactors) { + this.successFactors = successFactors; + } + + public Set <Long> getTaskIds() { + return taskIds; + } + + public void addTaskId(Long taskId) { + taskIds.add(taskId); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java new file mode 100644 index 0000000..fdc41e5 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/TaskEventPublisher.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events.publishers; + +import org.apache.ambari.server.events.TaskEvent; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import com.google.inject.Singleton; + +/** + * The {@link TaskEventPublisher} is used to publish instances of + * {@link TaskEvent} to any {@link com.google.common.eventbus.Subscribe} interested. + * It uses a single-threaded, serial {@link EventBus}. + */ +@Singleton +public class TaskEventPublisher { + + /** + * A single threaded, synchronous event bus for processing task events. + */ + private final EventBus m_eventBus = new EventBus("ambari-task-report-event-bus"); + + + /** + * Publishes the specified event to all registered listeners that + * {@link Subscribe} to {@link TaskEvent} instances. + * + * @param event {@link TaskEvent} + */ + public void publish(TaskEvent event) { + m_eventBus.post(event); + } + + /** + * Register a listener to receive events. The listener should use the + * {@link Subscribe} annotation. + * + * @param object + * the listener to receive events. + */ + public void register(Object object) { + m_eventBus.register(object); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 02c4091..e834045 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -40,6 +40,8 @@ import org.apache.ambari.annotations.TransactionalLock; import org.apache.ambari.annotations.TransactionalLock.LockArea; import org.apache.ambari.annotations.TransactionalLock.LockType; import org.apache.ambari.server.RoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleCommandFactory; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.query.JpaPredicateVisitor; import org.apache.ambari.server.api.query.JpaSortBuilder; @@ -49,6 +51,9 @@ import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.SortRequest; import org.apache.ambari.server.controller.utilities.PredicateHelper; +import org.apache.ambari.server.events.TaskCreateEvent; +import org.apache.ambari.server.events.TaskUpdateEvent; +import org.apache.ambari.server.events.publishers.TaskEventPublisher; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.TransactionalLocks; import org.apache.ambari.server.orm.entities.HostEntity; @@ -58,9 +63,11 @@ import org.apache.ambari.server.orm.entities.StageEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.LoadingCache; +import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.google.inject.Inject; import com.google.inject.Provider; @@ -144,6 +151,13 @@ public class HostRoleCommandDAO { @Inject private Configuration configuration; + + @Inject + HostRoleCommandFactory hostRoleCommandFactory; + + @Inject + private TaskEventPublisher taskEventPublisher; + /** * Used to ensure that methods which rely on the completion of * {@link Transactional} can detect when they are able to run. @@ -629,11 +643,17 @@ public class HostRoleCommandDAO { @Transactional @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE) public HostRoleCommandEntity merge(HostRoleCommandEntity entity) { + entity = mergeWithoutPublishEvent(entity); + publishTaskUpdateEvent(Collections.singletonList(hostRoleCommandFactory.createExisting(entity))); + return entity; + } + + @Transactional + @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE) + public HostRoleCommandEntity mergeWithoutPublishEvent(HostRoleCommandEntity entity) { EntityManager entityManager = entityManagerProvider.get(); entity = entityManager.merge(entity); - invalidateHostRoleCommandStatusSummaryCache(entity); - return entity; } @@ -667,10 +687,51 @@ public class HostRoleCommandDAO { } invalidateHostRoleCommandStatusSummaryCache(requestsToInvalidate); - + publishTaskUpdateEvent(getHostRoleCommands(entities)); return managedList; } + /** + * + * @param entities + */ + public List<HostRoleCommand> getHostRoleCommands(Collection<HostRoleCommandEntity> entities) { + Function<HostRoleCommandEntity, HostRoleCommand> transform = new Function<HostRoleCommandEntity, HostRoleCommand> () { + @Override + public HostRoleCommand apply(HostRoleCommandEntity entity) { + return hostRoleCommandFactory.createExisting(entity); + } + }; + return FluentIterable.from(entities) + .transform(transform) + .toList(); + + } + + /** + * + * @param hostRoleCommands + */ + public void publishTaskUpdateEvent(List<HostRoleCommand> hostRoleCommands) { + if (!hostRoleCommands.isEmpty()) { + TaskUpdateEvent taskUpdateEvent = new TaskUpdateEvent(hostRoleCommands); + taskEventPublisher.publish(taskUpdateEvent); + } + } + + /** + * + * @param hostRoleCommands + */ + public void publishTaskCreateEvent(List<HostRoleCommand> hostRoleCommands) { + if (!hostRoleCommands.isEmpty()) { + TaskCreateEvent taskCreateEvent = new TaskCreateEvent(hostRoleCommands); + taskEventPublisher.publish(taskCreateEvent); + } + } + + + @Transactional @TransactionalLock(lockArea = LockArea.HRC_STATUS_CACHE, lockType = LockType.WRITE) public void remove(HostRoleCommandEntity entity) { http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 1c4d0a3..2696f66 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -144,6 +144,14 @@ public class RequestDAO { } @Transactional + public void updateStatus(long requestId, HostRoleStatus status, HostRoleStatus displayStatus) { + RequestEntity requestEntity = findByPK(requestId); + requestEntity.setStatus(status); + requestEntity.setDisplayStatus(displayStatus); + merge(requestEntity); + } + + @Transactional public void create(RequestEntity requestEntity) { entityManagerProvider.get().persist(requestEntity); } http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java index d2f899f..126468a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.orm.dao; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -173,11 +174,15 @@ public class StageDAO { return daoUtils.selectList(query); } + /** + * + * @param statuses {@link HostRoleStatus} + * @return list of stage entities + */ @RequiresSession - public List<StageEntity> findByCommandStatuses( - Collection<HostRoleStatus> statuses) { + public List<StageEntity> findByStatuses(Collection<HostRoleStatus> statuses) { TypedQuery<StageEntity> query = entityManagerProvider.get().createNamedQuery( - "StageEntity.findByCommandStatuses", StageEntity.class); + "StageEntity.findByStatuses", StageEntity.class); query.setParameter("statuses", statuses); return daoUtils.selectList(query); @@ -280,8 +285,8 @@ public class StageDAO { * the stage entity to update * @param desiredStatus * the desired stage status - * @param controller - * the ambari management controller + * @param actionManager + * the action manager * * @throws java.lang.IllegalArgumentException * if the transition to the desired status is not a legal transition @@ -301,9 +306,11 @@ public class StageDAO { if (desiredStatus == HostRoleStatus.ABORTED) { actionManager.cancelRequest(stage.getRequestId(), "User aborted."); } else { + List <HostRoleCommandEntity> hrcWithChangedStatus = new ArrayList<HostRoleCommandEntity>(); for (HostRoleCommandEntity hostRoleCommand : tasks) { HostRoleStatus hostRoleStatus = hostRoleCommand.getStatus(); if (hostRoleStatus.equals(currentStatus)) { + hrcWithChangedStatus.add(hostRoleCommand); hostRoleCommand.setStatus(desiredStatus); if (desiredStatus == HostRoleStatus.PENDING) { @@ -316,6 +323,21 @@ public class StageDAO { } /** + * + * @param stageEntityPK {@link StageEntityPK} + * @param status {@link HostRoleStatus} + * @param displayStatus {@link HostRoleStatus} + */ + @Transactional + public void updateStatus(StageEntityPK stageEntityPK, HostRoleStatus status, HostRoleStatus displayStatus) { + StageEntity stageEntity = findByPK(stageEntityPK); + stageEntity.setStatus(status); + stageEntity.setDisplayStatus(displayStatus); + merge(stageEntity); + } + + + /** * Determine whether or not it is valid to transition from this stage status * to the given status. * http://git-wip-us.apache.org/repos/asf/ambari/blob/0fc7a667/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java index 74271b9..a809295 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java @@ -105,9 +105,9 @@ public class HostRoleCommandEntity { @Basic private Integer exitcode = 0; - @Column(name = "status") + @Column(name = "status", nullable = false) @Enumerated(EnumType.STRING) - private HostRoleStatus status; + private HostRoleStatus status = HostRoleStatus.PENDING; @Column(name = "std_error") @Lob
