Repository: ambari Updated Branches: refs/heads/branch-2.4 07b5cd2e5 -> 2c5680bc6
AMBARI-18052 - Starting a Component After Pausing An Upgrade Can Take 9 Minutes (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2c5680bc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2c5680bc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2c5680bc Branch: refs/heads/branch-2.4 Commit: 2c5680bc65ac6722235018fc8aad1f939c90296b Parents: 07b5cd2 Author: Jonathan Hurley <[email protected]> Authored: Fri Aug 5 16:35:04 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Sat Aug 6 15:35:19 2016 -0400 ---------------------------------------------------------------------- .../server/actionmanager/ActionDBAccessor.java | 6 +- .../actionmanager/ActionDBAccessorImpl.java | 36 ++-- .../server/actionmanager/ActionScheduler.java | 57 +++++- .../internal/UpgradeResourceProvider.java | 38 ++-- .../apache/ambari/server/orm/dao/StageDAO.java | 15 ++ .../ambari/server/orm/entities/StageEntity.java | 8 +- .../state/services/MetricsRetrievalService.java | 2 +- .../actionmanager/TestActionScheduler.java | 178 +++++++++++++------ 8 files changed, 243 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index dcfe359..0e78cbc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -55,7 +55,9 @@ public interface ActionDBAccessor { Request getRequest(long requestId); /** - * Abort all outstanding operations associated with the given request + * Abort all outstanding operations associated with the given request. This + * method uses the {@link HostRoleStatus#SCHEDULED_STATES} to determine which + * {@link HostRoleCommand} instances to abort. */ public void abortOperation(long requestId); @@ -92,7 +94,7 @@ public interface ActionDBAccessor { /** * Persists all tasks for a given request - * + * * @param request * request object */ http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index b44dc78..b7e7f2d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,14 +37,13 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.agent.ExecutionCommand; -import org.apache.ambari.server.audit.event.AuditEvent; import org.apache.ambari.server.audit.AuditLogger; +import org.apache.ambari.server.audit.event.AuditEvent; import org.apache.ambari.server.audit.event.OperationStatusAuditEvent; import org.apache.ambari.server.audit.event.TaskStatusAuditEvent; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.events.HostRemovedEvent; -import org.apache.ambari.server.events.ServiceComponentUninstalledEvent; import org.apache.ambari.server.events.RequestFinishedEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.ClusterDAO; @@ -200,8 +198,8 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } } - /* (non-Javadoc) - * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long) + /** + * {@inheritDoc} */ @Override public void abortOperation(long requestId) { @@ -209,23 +207,21 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { endRequest(requestId); - List<HostRoleCommandEntity> commands = - hostRoleCommandDAO.findByRequest(requestId); - for (HostRoleCommandEntity command : commands) { - if (command.getStatus() == HostRoleStatus.QUEUED || - command.getStatus() == HostRoleStatus.IN_PROGRESS || - command.getStatus() == HostRoleStatus.PENDING) { + // only request commands which actually need to be aborted; requesting all + // commands here can cause OOM problems during large requests like upgrades + List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequestIdAndStatuses(requestId, + HostRoleStatus.SCHEDULED_STATES); - command.setStatus(HostRoleStatus.ABORTED); - command.setEndTime(now); - LOG.info("Aborting command. Hostname " + command.getHostName() - + " role " + command.getRole() - + " requestId " + command.getRequestId() - + " taskId " + command.getTaskId() - + " stageId " + command.getStageId()); + for (HostRoleCommandEntity command : commands) { + command.setStatus(HostRoleStatus.ABORTED); + command.setEndTime(now); + LOG.info("Aborting command. Hostname " + command.getHostName() + + " role " + command.getRole() + + " requestId " + command.getRequestId() + + " taskId " + command.getTaskId() + + " stageId " + command.getStageId()); - auditLog(command, requestId); - } + auditLog(command, requestId); } // no need to merge if there's nothing to merge http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index cdef06e..e380ae4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -50,6 +50,8 @@ import org.apache.ambari.server.events.ActionFinalReportReceivedEvent; import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.events.publishers.JPAEventPublisher; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.serveraction.ServerActionExecutor; import org.apache.ambari.server.state.Cluster; @@ -115,6 +117,23 @@ class ActionScheduler implements Runnable { @Inject Provider<EntityManager> entityManagerProvider; + /** + * Used for turning instances of {@link HostRoleCommandEntity} into + * {@link HostRoleCommand}. + */ + @Inject + private HostRoleCommandFactory hostRoleCommandFactory; + + /** + * Used for retrieving {@link HostRoleCommandEntity} instances which need to + * be cancelled. + */ + @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + /** + * The current thread's reference to the {@link EntityManager}. + */ volatile EntityManager threadEntityManager; private final long actionTimeout; @@ -194,11 +213,14 @@ class ActionScheduler implements Runnable { * @param unitOfWork * @param ambariEventPublisher * @param configuration + * @param hostRoleCommandDAO + * @param hostRoleCommandFactory */ protected ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec, ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap hostsMap, UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher, - Configuration configuration, Provider<EntityManager> entityManagerProvider) { + Configuration configuration, Provider<EntityManager> entityManagerProvider, + HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory hostRoleCommandFactory) { sleepTime = sleepTimeMilliSec; actionTimeout = actionTimeoutMilliSec; @@ -211,6 +233,8 @@ class ActionScheduler implements Runnable { this.ambariEventPublisher = ambariEventPublisher; this.configuration = configuration; this.entityManagerProvider = entityManagerProvider; + this.hostRoleCommandDAO = hostRoleCommandDAO; + this.hostRoleCommandFactory = hostRoleCommandFactory; jpaPublisher = null; serverActionExecutor = new ServerActionExecutor(db, sleepTime); @@ -1098,14 +1122,32 @@ class ActionScheduler implements Runnable { synchronized (requestsToBeCancelled) { // Now, cancel stages completely for (Long requestId : requestsToBeCancelled) { - List<HostRoleCommand> tasksToDequeue = db.getRequestTasks(requestId); - String reason = requestCancelReasons.get(requestId); - cancelHostRoleCommands(tasksToDequeue, reason); - List<Stage> stages = db.getAllStages(requestId); - for (Stage stage : stages) { - abortOperationsForStage(stage); + // only pull back entities that have not completed; pulling back all + // entities for the request can cause OOM problems on large requests, + // like those for upgrades + List<HostRoleCommandEntity> entitiesToDequeue = hostRoleCommandDAO.findByRequestIdAndStatuses( + requestId, HostRoleStatus.NOT_COMPLETED_STATUSES); + + if (!entitiesToDequeue.isEmpty()) { + List<HostRoleCommand> tasksToDequeue = new ArrayList<>(entitiesToDequeue.size()); + for (HostRoleCommandEntity hrcEntity : entitiesToDequeue) { + HostRoleCommand task = hostRoleCommandFactory.createExisting(hrcEntity); + tasksToDequeue.add(task); + } + + String reason = requestCancelReasons.get(requestId); + cancelHostRoleCommands(tasksToDequeue, reason); + } + + // abort any stages in progress; don't execute this for all stages since + // that could lead to OOM errors on large requests, like those for + // upgrades + List<Stage> stagesInProgress = db.getStagesInProgress(); + for (Stage stageInProgress : stagesInProgress) { + abortOperationsForStage(stageInProgress); } } + requestsToBeCancelled.clear(); requestCancelReasons.clear(); } @@ -1154,6 +1196,7 @@ class ActionScheduler implements Runnable { } } } + void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) { for (HostRoleCommand hostRoleCommand : hostRoleCommands) { // There are no server actions in actionQueue http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java index 205a03d..d37e32b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java @@ -71,6 +71,7 @@ import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.dao.RepositoryVersionDAO; import org.apache.ambari.server.orm.dao.RequestDAO; import org.apache.ambari.server.orm.dao.UpgradeDAO; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RepositoryVersionEntity; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.orm.entities.StackEntity; @@ -117,6 +118,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -474,6 +476,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider throw new IllegalArgumentException(String.format("%s is required", UPGRADE_REQUEST_ID)); } + long clusterId = cluster.getClusterId(); long requestId = Long.parseLong(requestIdProperty); UpgradeEntity upgradeEntity = s_upgradeDAO.findUpgradeByRequestId(requestId); if( null == upgradeEntity){ @@ -514,7 +517,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider suspended = Boolean.valueOf((String) propertyMap.get(UPGRADE_SUSPENDED)); } - setUpgradeRequestStatus(requestIdProperty, status, propertyMap); + setUpgradeRequestStatus(clusterId, requestId, status, propertyMap); // When the status of the upgrade's request is changing, we also update the suspended flag. upgradeEntity.setSuspended(suspended); @@ -1747,6 +1750,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider * <li>{@link HostRoleStatus#PENDING}</li> * </ul> * + * @param clusterId + * the ID of the cluster * @param requestId * the request to change the status for. * @param status @@ -1755,7 +1760,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider * the map of request properties (needed for things like abort reason * if present) */ - private void setUpgradeRequestStatus(String requestId, HostRoleStatus status, + private void setUpgradeRequestStatus(long clusterId, long requestId, HostRoleStatus status, Map<String, Object> propertyMap) { if (status != HostRoleStatus.ABORTED && status != HostRoleStatus.PENDING) { throw new IllegalArgumentException(String.format("Cannot set status %s, only %s is allowed", @@ -1767,14 +1772,14 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider reason = String.format(DEFAULT_REASON_TEMPLATE, requestId); } - ActionManager actionManager = getManagementController().getActionManager(); - List<org.apache.ambari.server.actionmanager.Request> requests = actionManager.getRequests( - Collections.singletonList(Long.valueOf(requestId))); - - org.apache.ambari.server.actionmanager.Request internalRequest = requests.get(0); + // do not try to pull back the entire request here as they can be massive + // and cause OOM problems; instead, use the count of statuses to determine + // the state of the upgrade request + Map<Long, HostRoleCommandStatusSummaryDTO> aggregateCounts = s_hostRoleCommandDAO.findAggregateCounts(requestId); + CalculatedStatus calculatedStatus = CalculatedStatus.statusFromStageSummary(aggregateCounts, + aggregateCounts.keySet()); - HostRoleStatus internalStatus = CalculatedStatus.statusFromStages( - internalRequest.getStages()).getStatus(); + HostRoleStatus internalStatus = calculatedStatus.getStatus(); if (HostRoleStatus.PENDING == status && !(internalStatus == HostRoleStatus.ABORTED || internalStatus == HostRoleStatus.IN_PROGRESS)) { throw new IllegalArgumentException( @@ -1782,10 +1787,11 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider HostRoleStatus.ABORTED, internalStatus)); } - Long clusterId = internalRequest.getClusterId(); + ActionManager actionManager = getManagementController().getActionManager(); + if (HostRoleStatus.ABORTED == status) { if (!internalStatus.isCompletedState()) { - actionManager.cancelRequest(internalRequest.getRequestId(), reason); + actionManager.cancelRequest(requestId, reason); // Remove relevant upgrade entity try { Cluster cluster = clusters.get().getClusterById(clusterId); @@ -1801,11 +1807,11 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider } else { // Status must be PENDING. List<Long> taskIds = new ArrayList<Long>(); - for (HostRoleCommand hrc : internalRequest.getCommands()) { - if (HostRoleStatus.ABORTED == hrc.getStatus() - || HostRoleStatus.TIMEDOUT == hrc.getStatus()) { - taskIds.add(hrc.getTaskId()); - } + List<HostRoleCommandEntity> hrcEntities = s_hostRoleCommandDAO.findByRequestIdAndStatuses( + requestId, Sets.newHashSet(HostRoleStatus.ABORTED, HostRoleStatus.TIMEDOUT)); + + for (HostRoleCommandEntity hrcEntity : hrcEntities) { + taskIds.add(hrcEntity.getTaskId()); } actionManager.resubmitTasks(taskIds); http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java index 541b2e9..8ef4a1b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java @@ -129,6 +129,21 @@ public class StageDAO { } /** + * Gets all of the stage IDs associated with a request. + * + * @param requestId + * @return the list of stage IDs. + */ + @RequiresSession + public List<Long> findIdsByRequestId(long requestId) { + TypedQuery<Long> query = entityManagerProvider.get().createNamedQuery( + "StageEntity.findIdsByRequestId", Long.class); + + query.setParameter("requestId", requestId); + return daoUtils.selectList(query); + } + + /** * Get the list of stage entities for the given request id and stage ids. * * @param requestId the request ids http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java index 12ab568..7659a23 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java @@ -39,7 +39,13 @@ import javax.persistence.Table; @Entity @Table(name = "stage") @IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class) -@NamedQueries({ @NamedQuery(name = "StageEntity.findByCommandStatuses", query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId") }) +@NamedQueries({ + @NamedQuery( + name = "StageEntity.findByCommandStatuses", + query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, stage.stageId"), + @NamedQuery( + name = "StageEntity.findIdsByRequestId", + query = "SELECT stage.stageId FROM StageEntity stage WHERE stage.requestId = :requestId ORDER BY stage.stageId ASC") }) public class StageEntity { @Column(name = "cluster_id", updatable = false, nullable = false) http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java index 97816e8..fa36905 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java @@ -89,7 +89,7 @@ public class MetricsRetrievalService extends AbstractService { * * @see #s_exceptionCache */ - private static final int EXCEPTION_CACHE_TIMEOUT_MINUTES = 5; + private static final int EXCEPTION_CACHE_TIMEOUT_MINUTES = 20; /** * Exceptions from this service should not SPAM the logs; so cache exceptions http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 22ac60f..9f12a94 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -74,7 +74,9 @@ import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; import org.apache.ambari.server.orm.dao.HostDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RequestEntity; import org.apache.ambari.server.serveraction.MockServerAction; import org.apache.ambari.server.serveraction.ServerActionExecutor; @@ -215,7 +217,7 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm, - 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -328,7 +330,7 @@ public class TestActionScheduler { //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, null, null); scheduler.setTaskTimeoutAdjustment(false); // Start the thread @@ -413,7 +415,8 @@ public class TestActionScheduler { AmbariEventPublisher aep = EasyMock.createNiceMock(AmbariEventPublisher.class); ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class). withConstructor((long) 100, (long) 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, aep, conf, entityManagerProviderMock). + new HostsMap((String) null), unitOfWork, aep, conf, entityManagerProviderMock, + mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class)). addMockedMethod("cancelHostRoleCommands"). createMock(); scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class)); @@ -536,7 +539,8 @@ public class TestActionScheduler { // Make sure the NN install doesn't timeout ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); scheduler.setTaskTimeoutAdjustment(false); int cycleCount=0; @@ -650,7 +654,8 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -727,7 +732,7 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf, - entityManagerProviderMock); + entityManagerProviderMock, (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); scheduler.doWork(); @@ -807,7 +812,8 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION").isCompletedState() @@ -827,8 +833,10 @@ public class TestActionScheduler { ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class) .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class, - HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, Provider.class) - .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, entityManagerProviderMock) + HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, + Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class) + .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, entityManagerProviderMock, + mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class)) .createNiceMock(); EasyMock.replay(scheduler); @@ -935,9 +943,11 @@ public class TestActionScheduler { ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class) .withConstructor(long.class, long.class, ActionDBAccessor.class, ActionQueue.class, Clusters.class, int.class, - HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, Provider.class) + HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, Configuration.class, + Provider.class, HostRoleCommandDAO.class, HostRoleCommandFactory.class) .withArgs(100L, 50L, db, aq, fsm, -1, null, null, ambariEventPublisher, null, - entityManagerProviderMock) + entityManagerProviderMock, mock(HostRoleCommandDAO.class), + mock(HostRoleCommandFactory.class)) .createNiceMock(); EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost); @@ -1015,7 +1025,8 @@ public class TestActionScheduler { }).when(db).getTasksByRoleAndStatus(anyString(), any(HostRoleStatus.class)); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -1122,7 +1133,8 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock)); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1213,7 +1225,8 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock)); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1287,7 +1300,8 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock)); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -1409,7 +1423,8 @@ public class TestActionScheduler { withConstructor((long)100, (long)50, db, aq, fsm, 3, new HostsMap((String) null), unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf, - entityManagerProviderMock). + entityManagerProviderMock, mock(HostRoleCommandDAO.class), + mock(HostRoleCommandFactory.class)). addMockedMethod("cancelHostRoleCommands"). createMock(); scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList), @@ -1584,7 +1599,8 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); scheduler.doWork(); @@ -1768,7 +1784,8 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); ActionManager am = new ActionManager(db, requestFactory, scheduler); @@ -1952,7 +1969,8 @@ public class TestActionScheduler { //Keep large number of attempts so that the task is not expired finally //Small action timeout to test rescheduling ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, - 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + 10000, new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); scheduler.setTaskTimeoutAdjustment(false); List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler); @@ -2035,7 +2053,8 @@ public class TestActionScheduler { when(db.getStagesInProgress()).thenReturn(stages); ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); final CountDownLatch abortCalls = new CountDownLatch(2); @@ -2144,7 +2163,8 @@ public class TestActionScheduler { ServerActionExecutor.init(injector); ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); int cycleCount = 0; while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION") @@ -2166,6 +2186,9 @@ public class TestActionScheduler { ServiceComponent scomp = mock(ServiceComponent.class); ServiceComponentHost sch = mock(ServiceComponentHost.class); UnitOfWork unitOfWork = mock(UnitOfWork.class); + HostRoleCommandDAO hostRoleCommandDAO = mock(HostRoleCommandDAO.class); + HostRoleCommandFactory hostRoleCommandFactory = mock(HostRoleCommandFactory.class); + when(fsm.getCluster(anyString())).thenReturn(oneClusterMock); when(oneClusterMock.getService(anyString())).thenReturn(serviceObj); when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp); @@ -2176,23 +2199,63 @@ public class TestActionScheduler { hostEntity.setHostName(hostname); hostDAO.create(hostEntity); - HashMap<String, ServiceComponentHost> hosts = - new HashMap<String, ServiceComponentHost>(); + HashMap<String, ServiceComponentHost> hosts = new HashMap<String, ServiceComponentHost>(); hosts.put(hostname, sch); when(scomp.getServiceComponentHosts()).thenReturn(hosts); long requestId = 1; - final List<Stage> stages = new ArrayList<Stage>(); - int namenodeCmdTaskId = 1; - stages.add( - getStageWithSingleTask( - hostname, "cluster1", Role.NAMENODE, RoleCommand.START, - Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId)); - stages.add( - getStageWithSingleTask( - hostname, "cluster1", Role.DATANODE, RoleCommand.START, - Service.Type.HDFS, 2, 2, (int)requestId)); + // create 3 stages, each with a single task - the first stage will be completed and should not + // be included when cancelling the unfinished tasks of the request + final List<Stage> allStages = new ArrayList<Stage>(); + final List<Stage> stagesInProgress = new ArrayList<Stage>(); + final List<HostRoleCommand> tasksInProgress = new ArrayList<>(); + final List<HostRoleCommandEntity> hrcEntitiesInProgress = new ArrayList<>(); + + int secondaryNamenodeCmdTaskId = 1; + int namenodeCmdTaskId = 2; + int datanodeCmdTaskId = 3; + + Stage stageWithTask = getStageWithSingleTask( + hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START, + Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId); + + // complete the first stage + stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED); + allStages.add(stageWithTask); + + stageWithTask = getStageWithSingleTask( + hostname, "cluster1", Role.NAMENODE, RoleCommand.START, + Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId); + + tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands()); + stagesInProgress.add(stageWithTask); + allStages.add(stageWithTask); + + stageWithTask = getStageWithSingleTask( + hostname, "cluster1", Role.DATANODE, RoleCommand.START, + Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId); + + tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands()); + stagesInProgress.add(stageWithTask); + allStages.add(stageWithTask); + + // convert HRC to HRCEntity for the mock DAO to use + for (HostRoleCommand hostRoleCommand : tasksInProgress) { + HostRoleCommandEntity entity = mock(HostRoleCommandEntity.class); + when(entity.getTaskId()).thenReturn(hostRoleCommand.getTaskId()); + when(entity.getStageId()).thenReturn(hostRoleCommand.getStageId()); + when(entity.getRequestId()).thenReturn(hostRoleCommand.getRequestId()); + when(entity.getHostId()).thenReturn(hostRoleCommand.getHostId()); + when(entity.getHostName()).thenReturn(hostRoleCommand.getHostName()); + when(entity.getRole()).thenReturn(hostRoleCommand.getRole()); + when(entity.getStatus()).thenReturn(hostRoleCommand.getStatus()); + when(entity.getRoleCommand()).thenReturn(hostRoleCommand.getRoleCommand()); + + hrcEntitiesInProgress.add(entity); + + when(hostRoleCommandFactory.createExisting(entity)).thenReturn(hostRoleCommand); + } Host host = mock(Host.class); when(fsm.getHost(anyString())).thenReturn(host); @@ -2205,15 +2268,18 @@ public class TestActionScheduler { when(request.isExclusive()).thenReturn(false); when(db.getRequestEntity(anyLong())).thenReturn(request); - when(db.getCommandsInProgressCount()).thenReturn(stages.size()); - when(db.getStagesInProgress()).thenReturn(stages); + when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size()); + when(db.getStagesInProgress()).thenReturn(stagesInProgress); + when(db.getAllStages(anyLong())).thenReturn(allStages); List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>(); - for (Stage stage : stages) { + + for (Stage stage : allStages) { requestTasks.addAll(stage.getOrderedHostRoleCommands()); } + when(db.getRequestTasks(anyLong())).thenReturn(requestTasks); - when(db.getAllStages(anyLong())).thenReturn(stages); + doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -2224,7 +2290,7 @@ public class TestActionScheduler { Long requestId = requestStageIds[0]; Long stageId = requestStageIds[1]; Long id = report.getTaskId(); - for (Stage stage : stages) { + for (Stage stage : stagesInProgress) { if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) { if (hostRoleCommand.getTaskId() == id) { @@ -2244,7 +2310,7 @@ public class TestActionScheduler { @Override public Object answer(InvocationOnMock invocation) throws Throwable { Long taskId = (Long) invocation.getArguments()[0]; - for (Stage stage : stages) { + for (Stage stage : allStages) { for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { if (taskId.equals(command.getTaskId())) { return command; @@ -2254,11 +2320,12 @@ public class TestActionScheduler { return null; } }); + doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { Long requestId = (Long) invocation.getArguments()[0]; - for (Stage stage : stages) { + for (Stage stage : stagesInProgress) { if (requestId.equals(stage.getRequestId())) { for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) { if (command.getStatus() == HostRoleStatus.QUEUED || @@ -2277,8 +2344,12 @@ public class TestActionScheduler { Properties properties = new Properties(); Configuration conf = new Configuration(properties); + when(hostRoleCommandDAO.findByRequestIdAndStatuses(requestId, + HostRoleStatus.NOT_COMPLETED_STATUSES)).thenReturn(hrcEntitiesInProgress); + ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + hostRoleCommandDAO, hostRoleCommandFactory); scheduler.doWork(); @@ -2287,8 +2358,10 @@ public class TestActionScheduler { scheduler.scheduleCancellingRequest(requestId, reason); scheduler.doWork(); - Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(0).getHostRoleStatus(hostname, "NAMENODE")); - Assert.assertEquals(HostRoleStatus.ABORTED, stages.get(1).getHostRoleStatus(hostname, "DATANODE")); + Assert.assertEquals(HostRoleStatus.COMPLETED, allStages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE")); + Assert.assertEquals(HostRoleStatus.ABORTED, allStages.get(1).getHostRoleStatus(hostname, "NAMENODE")); + Assert.assertEquals(HostRoleStatus.ABORTED, allStages.get(2).getHostRoleStatus(hostname, "DATANODE")); + Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be generated only for 1 stage CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname); @@ -2438,7 +2511,8 @@ public class TestActionScheduler { Configuration conf = new Configuration(properties); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock)); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -2498,7 +2572,8 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE); hrc1.setStatus(HostRoleStatus.COMPLETED); @@ -2530,7 +2605,8 @@ public class TestActionScheduler { ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3, new HostsMap((String) null), - unitOfWork, null, conf, entityManagerProviderMock); + unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, null, RoleCommand.EXECUTE); hrc1.setStatus(HostRoleStatus.COMPLETED); @@ -2669,7 +2745,8 @@ public class TestActionScheduler { }).when(db).abortOperation(anyLong()); ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 3, - new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock)); + new HostsMap((String) null), unitOfWork, null, conf, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null)); doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class), any(Stage.class), anyString()); @@ -2716,7 +2793,8 @@ public class TestActionScheduler { expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F); ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3, - new HostsMap((String) null), null, null, null, entityManagerProviderMock); + new HostsMap((String) null), null, null, null, entityManagerProviderMock, + (HostRoleCommandDAO)null, (HostRoleCommandFactory)null); replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand); @@ -2735,7 +2813,7 @@ public class TestActionScheduler { expect(nextStage.getStageId()).andReturn(0L); ActionScheduler scheduler = new ActionScheduler(100, 50, null, null, null, 3, - new HostsMap((String) null), null, null, null, entityManagerProviderMock); + new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null); replay(nextStage); @@ -2773,7 +2851,7 @@ public class TestActionScheduler { expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F); ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, null, null, 3, - new HostsMap((String) null), null, null, null, entityManagerProviderMock); + new HostsMap((String) null), null, null, null, entityManagerProviderMock, null, null); replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand);
