Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 07b5cd2e5 -> 2c5680bc6


AMBARI-18052 - Starting a Component After Pausing An Upgrade Can Take 9 Minutes 
(jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2c5680bc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2c5680bc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2c5680bc

Branch: refs/heads/branch-2.4
Commit: 2c5680bc65ac6722235018fc8aad1f939c90296b
Parents: 07b5cd2
Author: Jonathan Hurley <[email protected]>
Authored: Fri Aug 5 16:35:04 2016 -0400
Committer: Jonathan Hurley <[email protected]>
Committed: Sat Aug 6 15:35:19 2016 -0400

----------------------------------------------------------------------
 .../server/actionmanager/ActionDBAccessor.java  |   6 +-
 .../actionmanager/ActionDBAccessorImpl.java     |  36 ++--
 .../server/actionmanager/ActionScheduler.java   |  57 +++++-
 .../internal/UpgradeResourceProvider.java       |  38 ++--
 .../apache/ambari/server/orm/dao/StageDAO.java  |  15 ++
 .../ambari/server/orm/entities/StageEntity.java |   8 +-
 .../state/services/MetricsRetrievalService.java |   2 +-
 .../actionmanager/TestActionScheduler.java      | 178 +++++++++++++------
 8 files changed, 243 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
index dcfe359..0e78cbc 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
@@ -55,7 +55,9 @@ public interface ActionDBAccessor {
   Request getRequest(long requestId);
 
   /**
-   * Abort all outstanding operations associated with the given request
+   * Abort all outstanding operations associated with the given request. This
+   * method uses the {@link HostRoleStatus#SCHEDULED_STATES} to determine which
+   * {@link HostRoleCommand} instances to abort.
    */
   public void abortOperation(long requestId);
 
@@ -92,7 +94,7 @@ public interface ActionDBAccessor {
 
   /**
    * Persists all tasks for a given request
-   * 
+   *
    * @param request
    *          request object
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index b44dc78..b7e7f2d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -38,14 +37,13 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
-import org.apache.ambari.server.audit.event.AuditEvent;
 import org.apache.ambari.server.audit.AuditLogger;
+import org.apache.ambari.server.audit.event.AuditEvent;
 import org.apache.ambari.server.audit.event.OperationStatusAuditEvent;
 import org.apache.ambari.server.audit.event.TaskStatusAuditEvent;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.HostRemovedEvent;
-import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
 import org.apache.ambari.server.events.RequestFinishedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
@@ -200,8 +198,8 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     }
   }
 
-  /* (non-Javadoc)
-   * @see 
org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long)
+  /**
+   * {@inheritDoc}
    */
   @Override
   public void abortOperation(long requestId) {
@@ -209,23 +207,21 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
 
     endRequest(requestId);
 
-    List<HostRoleCommandEntity> commands =
-        hostRoleCommandDAO.findByRequest(requestId);
-    for (HostRoleCommandEntity command : commands) {
-      if (command.getStatus() == HostRoleStatus.QUEUED ||
-          command.getStatus() == HostRoleStatus.IN_PROGRESS ||
-          command.getStatus() == HostRoleStatus.PENDING) {
+    // only request commands which actually need to be aborted; requesting all
+    // commands here can cause OOM problems during large requests like upgrades
+    List<HostRoleCommandEntity> commands = 
hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
+        HostRoleStatus.SCHEDULED_STATES);
 
-        command.setStatus(HostRoleStatus.ABORTED);
-        command.setEndTime(now);
-        LOG.info("Aborting command. Hostname " + command.getHostName()
-            + " role " + command.getRole()
-            + " requestId " + command.getRequestId()
-            + " taskId " + command.getTaskId()
-            + " stageId " + command.getStageId());
+    for (HostRoleCommandEntity command : commands) {
+      command.setStatus(HostRoleStatus.ABORTED);
+      command.setEndTime(now);
+      LOG.info("Aborting command. Hostname " + command.getHostName()
+          + " role " + command.getRole()
+          + " requestId " + command.getRequestId()
+          + " taskId " + command.getTaskId()
+          + " stageId " + command.getStageId());
 
-        auditLog(command, requestId);
-      }
+      auditLog(command, requestId);
     }
 
     // no need to merge if there's nothing to merge

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index cdef06e..e380ae4 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -50,6 +50,8 @@ import 
org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
 import org.apache.ambari.server.events.jpa.EntityManagerCacheInvalidationEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.events.publishers.JPAEventPublisher;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.serveraction.ServerActionExecutor;
 import org.apache.ambari.server.state.Cluster;
@@ -115,6 +117,23 @@ class ActionScheduler implements Runnable {
   @Inject
   Provider<EntityManager> entityManagerProvider;
 
+  /**
+   * Used for turning instances of {@link HostRoleCommandEntity} into
+   * {@link HostRoleCommand}.
+   */
+  @Inject
+  private HostRoleCommandFactory hostRoleCommandFactory;
+
+  /**
+   * Used for retrieving {@link HostRoleCommandEntity} instances which need to
+   * be cancelled.
+   */
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  /**
+   * The current thread's reference to the {@link EntityManager}.
+   */
   volatile EntityManager threadEntityManager;
 
   private final long actionTimeout;
@@ -194,11 +213,14 @@ class ActionScheduler implements Runnable {
    * @param unitOfWork
    * @param ambariEventPublisher
    * @param configuration
+   * @param hostRoleCommandDAO
+   * @param hostRoleCommandFactory
    */
   protected ActionScheduler(long sleepTimeMilliSec, long 
actionTimeoutMilliSec, ActionDBAccessor db,
       ActionQueue actionQueue, Clusters fsmObject, int maxAttempts, HostsMap 
hostsMap,
       UnitOfWork unitOfWork, AmbariEventPublisher ambariEventPublisher,
-      Configuration configuration, Provider<EntityManager> 
entityManagerProvider) {
+      Configuration configuration, Provider<EntityManager> 
entityManagerProvider,
+      HostRoleCommandDAO hostRoleCommandDAO, HostRoleCommandFactory 
hostRoleCommandFactory) {
 
     sleepTime = sleepTimeMilliSec;
     actionTimeout = actionTimeoutMilliSec;
@@ -211,6 +233,8 @@ class ActionScheduler implements Runnable {
     this.ambariEventPublisher = ambariEventPublisher;
     this.configuration = configuration;
     this.entityManagerProvider = entityManagerProvider;
+    this.hostRoleCommandDAO = hostRoleCommandDAO;
+    this.hostRoleCommandFactory = hostRoleCommandFactory;
     jpaPublisher = null;
 
     serverActionExecutor = new ServerActionExecutor(db, sleepTime);
@@ -1098,14 +1122,32 @@ class ActionScheduler implements Runnable {
     synchronized (requestsToBeCancelled) {
       // Now, cancel stages completely
       for (Long requestId : requestsToBeCancelled) {
-        List<HostRoleCommand> tasksToDequeue = db.getRequestTasks(requestId);
-        String reason = requestCancelReasons.get(requestId);
-        cancelHostRoleCommands(tasksToDequeue, reason);
-        List<Stage> stages = db.getAllStages(requestId);
-        for (Stage stage : stages) {
-          abortOperationsForStage(stage);
+        // only pull back entities that have not completed; pulling back all
+        // entities for the request can cause OOM problems on large requests,
+        // like those for upgrades
+        List<HostRoleCommandEntity> entitiesToDequeue = 
hostRoleCommandDAO.findByRequestIdAndStatuses(
+            requestId, HostRoleStatus.NOT_COMPLETED_STATUSES);
+
+        if (!entitiesToDequeue.isEmpty()) {
+          List<HostRoleCommand> tasksToDequeue = new 
ArrayList<>(entitiesToDequeue.size());
+          for (HostRoleCommandEntity hrcEntity : entitiesToDequeue) {
+            HostRoleCommand task = 
hostRoleCommandFactory.createExisting(hrcEntity);
+            tasksToDequeue.add(task);
+          }
+
+          String reason = requestCancelReasons.get(requestId);
+          cancelHostRoleCommands(tasksToDequeue, reason);
+        }
+
+        // abort any stages in progress; don't execute this for all stages 
since
+        // that could lead to OOM errors on large requests, like those for
+        // upgrades
+        List<Stage> stagesInProgress = db.getStagesInProgress();
+        for (Stage stageInProgress : stagesInProgress) {
+          abortOperationsForStage(stageInProgress);
         }
       }
+
       requestsToBeCancelled.clear();
       requestCancelReasons.clear();
     }
@@ -1154,6 +1196,7 @@ class ActionScheduler implements Runnable {
       }
     }
   }
+
   void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
     for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
       // There are no server actions in actionQueue

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 205a03d..d37e32b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -71,6 +71,7 @@ import 
org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
 import org.apache.ambari.server.orm.dao.RequestDAO;
 import org.apache.ambari.server.orm.dao.UpgradeDAO;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.orm.entities.StackEntity;
@@ -117,6 +118,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
@@ -474,6 +476,7 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
       throw new IllegalArgumentException(String.format("%s is required", 
UPGRADE_REQUEST_ID));
     }
 
+    long clusterId = cluster.getClusterId();
     long requestId = Long.parseLong(requestIdProperty);
     UpgradeEntity upgradeEntity = 
s_upgradeDAO.findUpgradeByRequestId(requestId);
     if( null == upgradeEntity){
@@ -514,7 +517,7 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
         suspended = Boolean.valueOf((String) 
propertyMap.get(UPGRADE_SUSPENDED));
       }
 
-      setUpgradeRequestStatus(requestIdProperty, status, propertyMap);
+      setUpgradeRequestStatus(clusterId, requestId, status, propertyMap);
 
       // When the status of the upgrade's request is changing, we also update 
the suspended flag.
       upgradeEntity.setSuspended(suspended);
@@ -1747,6 +1750,8 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
    * <li>{@link HostRoleStatus#PENDING}</li>
    * </ul>
    *
+   * @param clusterId
+   *          the ID of the cluster
    * @param requestId
    *          the request to change the status for.
    * @param status
@@ -1755,7 +1760,7 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
    *          the map of request properties (needed for things like abort 
reason
    *          if present)
    */
-  private void setUpgradeRequestStatus(String requestId, HostRoleStatus status,
+  private void setUpgradeRequestStatus(long clusterId, long requestId, 
HostRoleStatus status,
       Map<String, Object> propertyMap) {
     if (status != HostRoleStatus.ABORTED && status != HostRoleStatus.PENDING) {
       throw new IllegalArgumentException(String.format("Cannot set status %s, 
only %s is allowed",
@@ -1767,14 +1772,14 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
       reason = String.format(DEFAULT_REASON_TEMPLATE, requestId);
     }
 
-    ActionManager actionManager = getManagementController().getActionManager();
-    List<org.apache.ambari.server.actionmanager.Request> requests = 
actionManager.getRequests(
-        Collections.singletonList(Long.valueOf(requestId)));
-
-    org.apache.ambari.server.actionmanager.Request internalRequest = 
requests.get(0);
+    // do not try to pull back the entire request here as they can be massive
+    // and cause OOM problems; instead, use the count of statuses to determine
+    // the state of the upgrade request
+    Map<Long, HostRoleCommandStatusSummaryDTO> aggregateCounts = 
s_hostRoleCommandDAO.findAggregateCounts(requestId);
+    CalculatedStatus calculatedStatus = 
CalculatedStatus.statusFromStageSummary(aggregateCounts,
+        aggregateCounts.keySet());
 
-    HostRoleStatus internalStatus = CalculatedStatus.statusFromStages(
-        internalRequest.getStages()).getStatus();
+    HostRoleStatus internalStatus = calculatedStatus.getStatus();
 
     if (HostRoleStatus.PENDING == status && !(internalStatus == 
HostRoleStatus.ABORTED || internalStatus == HostRoleStatus.IN_PROGRESS)) {
       throw new IllegalArgumentException(
@@ -1782,10 +1787,11 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
               HostRoleStatus.ABORTED, internalStatus));
     }
 
-    Long clusterId = internalRequest.getClusterId();
+    ActionManager actionManager = getManagementController().getActionManager();
+
     if (HostRoleStatus.ABORTED == status) {
       if (!internalStatus.isCompletedState()) {
-        actionManager.cancelRequest(internalRequest.getRequestId(), reason);
+        actionManager.cancelRequest(requestId, reason);
         // Remove relevant upgrade entity
         try {
           Cluster cluster = clusters.get().getClusterById(clusterId);
@@ -1801,11 +1807,11 @@ public class UpgradeResourceProvider extends 
AbstractControllerResourceProvider
     } else {
       // Status must be PENDING.
       List<Long> taskIds = new ArrayList<Long>();
-      for (HostRoleCommand hrc : internalRequest.getCommands()) {
-        if (HostRoleStatus.ABORTED == hrc.getStatus()
-            || HostRoleStatus.TIMEDOUT == hrc.getStatus()) {
-          taskIds.add(hrc.getTaskId());
-        }
+      List<HostRoleCommandEntity> hrcEntities = 
s_hostRoleCommandDAO.findByRequestIdAndStatuses(
+          requestId, Sets.newHashSet(HostRoleStatus.ABORTED, 
HostRoleStatus.TIMEDOUT));
+
+      for (HostRoleCommandEntity hrcEntity : hrcEntities) {
+        taskIds.add(hrcEntity.getTaskId());
       }
 
       actionManager.resubmitTasks(taskIds);

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
index 541b2e9..8ef4a1b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/StageDAO.java
@@ -129,6 +129,21 @@ public class StageDAO {
   }
 
   /**
+   * Gets all of the stage IDs associated with a request.
+   *
+   * @param requestId
+   * @return the list of stage IDs.
+   */
+  @RequiresSession
+  public List<Long> findIdsByRequestId(long requestId) {
+    TypedQuery<Long> query = entityManagerProvider.get().createNamedQuery(
+        "StageEntity.findIdsByRequestId", Long.class);
+
+    query.setParameter("requestId", requestId);
+    return daoUtils.selectList(query);
+  }
+
+  /**
    * Get the list of stage entities for the given request id and stage ids.
    *
    * @param requestId  the request ids

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
index 12ab568..7659a23 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java
@@ -39,7 +39,13 @@ import javax.persistence.Table;
 @Entity
 @Table(name = "stage")
 @IdClass(org.apache.ambari.server.orm.entities.StageEntityPK.class)
-@NamedQueries({ @NamedQuery(name = "StageEntity.findByCommandStatuses", query 
= "SELECT stage from StageEntity stage WHERE EXISTS (SELECT roleCommand.stageId 
from HostRoleCommandEntity roleCommand WHERE roleCommand.status IN :statuses 
AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = 
stage.requestId ) ORDER by stage.requestId, stage.stageId") })
+@NamedQueries({
+    @NamedQuery(
+        name = "StageEntity.findByCommandStatuses",
+        query = "SELECT stage from StageEntity stage WHERE EXISTS (SELECT 
roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE 
roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND 
roleCommand.requestId = stage.requestId ) ORDER by stage.requestId, 
stage.stageId"),
+    @NamedQuery(
+        name = "StageEntity.findIdsByRequestId",
+        query = "SELECT stage.stageId FROM StageEntity stage WHERE 
stage.requestId = :requestId ORDER BY stage.stageId ASC") })
 public class StageEntity {
 
   @Column(name = "cluster_id", updatable = false, nullable = false)

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
index 97816e8..fa36905 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java
@@ -89,7 +89,7 @@ public class MetricsRetrievalService extends AbstractService {
    *
    * @see #s_exceptionCache
    */
-  private static final int EXCEPTION_CACHE_TIMEOUT_MINUTES = 5;
+  private static final int EXCEPTION_CACHE_TIMEOUT_MINUTES = 20;
 
   /**
    * Exceptions from this service should not SPAM the logs; so cache exceptions

http://git-wip-us.apache.org/repos/asf/ambari/blob/2c5680bc/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index 22ac60f..9f12a94 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -74,7 +74,9 @@ import 
org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.HostDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
 import org.apache.ambari.server.orm.entities.RequestEntity;
 import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.serveraction.ServerActionExecutor;
@@ -215,7 +217,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock, null, null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -328,7 +330,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock, null, null);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -413,7 +415,8 @@ public class TestActionScheduler {
     AmbariEventPublisher aep = 
EasyMock.createNiceMock(AmbariEventPublisher.class);
     ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
-            new HostsMap((String) null), unitOfWork, aep, conf, 
entityManagerProviderMock).
+            new HostsMap((String) null), unitOfWork, aep, conf, 
entityManagerProviderMock,
+            mock(HostRoleCommandDAO.class), 
mock(HostRoleCommandFactory.class)).
         addMockedMethod("cancelHostRoleCommands").
         createMock();
     
scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class));
@@ -536,7 +539,8 @@ public class TestActionScheduler {
 
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -650,7 +654,8 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -727,7 +732,7 @@ public class TestActionScheduler {
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
         unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), conf,
-        entityManagerProviderMock);
+        entityManagerProviderMock, (HostRoleCommandDAO)null, 
(HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -807,7 +812,8 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, 
"AMBARI_SERVER_ACTION").isCompletedState()
@@ -827,8 +833,10 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class)
       .withConstructor(long.class, long.class, ActionDBAccessor.class, 
ActionQueue.class, Clusters.class, int.class,
-        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class, Provider.class)
-      .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, 
entityManagerProviderMock)
+            HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class,
+            Provider.class, HostRoleCommandDAO.class, 
HostRoleCommandFactory.class)
+      .withArgs(100L, 50L, null, null, null, -1, null, null, null, null, 
entityManagerProviderMock,
+            mock(HostRoleCommandDAO.class), mock(HostRoleCommandFactory.class))
       .createNiceMock();
 
     EasyMock.replay(scheduler);
@@ -935,9 +943,11 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = 
EasyMock.createMockBuilder(ActionScheduler.class)
       .withConstructor(long.class, long.class, ActionDBAccessor.class, 
ActionQueue.class, Clusters.class, int.class,
-        HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class, Provider.class)
+            HostsMap.class, UnitOfWork.class, AmbariEventPublisher.class, 
Configuration.class,
+            Provider.class, HostRoleCommandDAO.class, 
HostRoleCommandFactory.class)
         .withArgs(100L, 50L, db, aq, fsm, -1, null, null, 
ambariEventPublisher, null,
-            entityManagerProviderMock)
+            entityManagerProviderMock, mock(HostRoleCommandDAO.class),
+            mock(HostRoleCommandFactory.class))
       .createNiceMock();
 
     EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, 
service, serviceComponent, serviceComponentHost);
@@ -1015,7 +1025,8 @@ public class TestActionScheduler {
     }).when(db).getTasksByRoleAndStatus(anyString(), 
any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -1122,7 +1133,8 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock));
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
 
     
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
@@ -1213,7 +1225,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
             new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock));
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
 
 
     
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
@@ -1287,7 +1300,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
         new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock));
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
 
     
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
@@ -1409,7 +1423,8 @@ public class TestActionScheduler {
         withConstructor((long)100, (long)50, db, aq, fsm, 3,
           new HostsMap((String) null),
             unitOfWork, EasyMock.createNiceMock(AmbariEventPublisher.class), 
conf,
-            entityManagerProviderMock).
+            entityManagerProviderMock, mock(HostRoleCommandDAO.class),
+            mock(HostRoleCommandFactory.class)).
           addMockedMethod("cancelHostRoleCommands").
           createMock();
     scheduler.cancelHostRoleCommands(EasyMock.capture(cancelCommandList),
@@ -1584,7 +1599,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock);
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     scheduler.doWork();
 
@@ -1768,7 +1784,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock);
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     ActionManager am = new ActionManager(db, requestFactory, scheduler);
 
@@ -1952,7 +1969,8 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        10000, new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -2035,7 +2053,8 @@ public class TestActionScheduler {
     when(db.getStagesInProgress()).thenReturn(stages);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     final CountDownLatch abortCalls = new CountDownLatch(2);
 
@@ -2144,7 +2163,8 @@ public class TestActionScheduler {
 
     ServerActionExecutor.init(injector);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     int cycleCount = 0;
     while (!stages.get(0).getHostRoleStatus(null, "AMBARI_SERVER_ACTION")
@@ -2166,6 +2186,9 @@ public class TestActionScheduler {
     ServiceComponent scomp = mock(ServiceComponent.class);
     ServiceComponentHost sch = mock(ServiceComponentHost.class);
     UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    HostRoleCommandDAO hostRoleCommandDAO = mock(HostRoleCommandDAO.class);
+    HostRoleCommandFactory hostRoleCommandFactory = 
mock(HostRoleCommandFactory.class);
+
     when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
     when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
@@ -2176,23 +2199,63 @@ public class TestActionScheduler {
     hostEntity.setHostName(hostname);
     hostDAO.create(hostEntity);
 
-    HashMap<String, ServiceComponentHost> hosts =
-            new HashMap<String, ServiceComponentHost>();
+    HashMap<String, ServiceComponentHost> hosts = new HashMap<String, 
ServiceComponentHost>();
     hosts.put(hostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     long requestId = 1;
 
-    final List<Stage> stages = new ArrayList<Stage>();
-    int namenodeCmdTaskId = 1;
-    stages.add(
-            getStageWithSingleTask(
-                    hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
-                    Service.Type.HDFS, namenodeCmdTaskId, 1, (int)requestId));
-    stages.add(
-            getStageWithSingleTask(
-                    hostname, "cluster1", Role.DATANODE, RoleCommand.START,
-                    Service.Type.HDFS, 2, 2, (int)requestId));
+    // create 3 stages, each with a single task - the first stage will be 
completed and should not
+    // be included when cancelling the unfinished tasks of the request
+    final List<Stage> allStages = new ArrayList<Stage>();
+    final List<Stage> stagesInProgress = new ArrayList<Stage>();
+    final List<HostRoleCommand> tasksInProgress = new ArrayList<>();
+    final List<HostRoleCommandEntity> hrcEntitiesInProgress = new 
ArrayList<>();
+
+    int secondaryNamenodeCmdTaskId = 1;
+    int namenodeCmdTaskId = 2;
+    int datanodeCmdTaskId = 3;
+
+    Stage stageWithTask = getStageWithSingleTask(
+        hostname, "cluster1", Role.SECONDARY_NAMENODE, RoleCommand.START,
+        Service.Type.HDFS, secondaryNamenodeCmdTaskId, 1, (int)requestId);
+
+    // complete the first stage
+    
stageWithTask.getOrderedHostRoleCommands().get(0).setStatus(HostRoleStatus.COMPLETED);
+    allStages.add(stageWithTask);
+
+    stageWithTask = getStageWithSingleTask(
+        hostname, "cluster1", Role.NAMENODE, RoleCommand.START,
+        Service.Type.HDFS, namenodeCmdTaskId, 2, (int)requestId);
+
+    tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
+    stagesInProgress.add(stageWithTask);
+    allStages.add(stageWithTask);
+
+    stageWithTask = getStageWithSingleTask(
+        hostname, "cluster1", Role.DATANODE, RoleCommand.START,
+        Service.Type.HDFS, datanodeCmdTaskId, 3, (int)requestId);
+
+    tasksInProgress.addAll(stageWithTask.getOrderedHostRoleCommands());
+    stagesInProgress.add(stageWithTask);
+    allStages.add(stageWithTask);
+
+    // convert HRC to HRCEntity for the mock DAO to use
+    for (HostRoleCommand hostRoleCommand : tasksInProgress) {
+      HostRoleCommandEntity entity = mock(HostRoleCommandEntity.class);
+      when(entity.getTaskId()).thenReturn(hostRoleCommand.getTaskId());
+      when(entity.getStageId()).thenReturn(hostRoleCommand.getStageId());
+      when(entity.getRequestId()).thenReturn(hostRoleCommand.getRequestId());
+      when(entity.getHostId()).thenReturn(hostRoleCommand.getHostId());
+      when(entity.getHostName()).thenReturn(hostRoleCommand.getHostName());
+      when(entity.getRole()).thenReturn(hostRoleCommand.getRole());
+      when(entity.getStatus()).thenReturn(hostRoleCommand.getStatus());
+      
when(entity.getRoleCommand()).thenReturn(hostRoleCommand.getRoleCommand());
+
+      hrcEntitiesInProgress.add(entity);
+
+      
when(hostRoleCommandFactory.createExisting(entity)).thenReturn(hostRoleCommand);
+    }
 
     Host host = mock(Host.class);
     when(fsm.getHost(anyString())).thenReturn(host);
@@ -2205,15 +2268,18 @@ public class TestActionScheduler {
     when(request.isExclusive()).thenReturn(false);
     when(db.getRequestEntity(anyLong())).thenReturn(request);
 
-    when(db.getCommandsInProgressCount()).thenReturn(stages.size());
-    when(db.getStagesInProgress()).thenReturn(stages);
+    when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
+    when(db.getStagesInProgress()).thenReturn(stagesInProgress);
+    when(db.getAllStages(anyLong())).thenReturn(allStages);
 
     List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
-    for (Stage stage : stages) {
+
+    for (Stage stage : allStages) {
       requestTasks.addAll(stage.getOrderedHostRoleCommands());
     }
+
     when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
-    when(db.getAllStages(anyLong())).thenReturn(stages);
+
     doAnswer(new Answer<Void>() {
       @Override
       public Void answer(InvocationOnMock invocation) throws Throwable {
@@ -2224,7 +2290,7 @@ public class TestActionScheduler {
           Long requestId = requestStageIds[0];
           Long stageId = requestStageIds[1];
           Long id = report.getTaskId();
-          for (Stage stage : stages) {
+          for (Stage stage : stagesInProgress) {
             if (requestId.equals(stage.getRequestId()) && 
stageId.equals(stage.getStageId())) {
               for (HostRoleCommand hostRoleCommand : 
stage.getOrderedHostRoleCommands()) {
                 if (hostRoleCommand.getTaskId() == id) {
@@ -2244,7 +2310,7 @@ public class TestActionScheduler {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
         Long taskId = (Long) invocation.getArguments()[0];
-        for (Stage stage : stages) {
+        for (Stage stage : allStages) {
           for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
             if (taskId.equals(command.getTaskId())) {
               return command;
@@ -2254,11 +2320,12 @@ public class TestActionScheduler {
         return null;
       }
     });
+
     doAnswer(new Answer<Void>() {
       @Override
       public Void answer(InvocationOnMock invocation) throws Throwable {
         Long requestId = (Long) invocation.getArguments()[0];
-        for (Stage stage : stages) {
+        for (Stage stage : stagesInProgress) {
           if (requestId.equals(stage.getRequestId())) {
             for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) 
{
               if (command.getStatus() == HostRoleStatus.QUEUED ||
@@ -2277,8 +2344,12 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
 
+    when(hostRoleCommandDAO.findByRequestIdAndStatuses(requestId,
+        
HostRoleStatus.NOT_COMPLETED_STATUSES)).thenReturn(hrcEntitiesInProgress);
+
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock);
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        hostRoleCommandDAO, hostRoleCommandFactory);
 
     scheduler.doWork();
 
@@ -2287,8 +2358,10 @@ public class TestActionScheduler {
     scheduler.scheduleCancellingRequest(requestId, reason);
 
     scheduler.doWork();
-    Assert.assertEquals(HostRoleStatus.ABORTED, 
stages.get(0).getHostRoleStatus(hostname, "NAMENODE"));
-    Assert.assertEquals(HostRoleStatus.ABORTED, 
stages.get(1).getHostRoleStatus(hostname, "DATANODE"));
+    Assert.assertEquals(HostRoleStatus.COMPLETED, 
allStages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE"));
+    Assert.assertEquals(HostRoleStatus.ABORTED, 
allStages.get(1).getHostRoleStatus(hostname, "NAMENODE"));
+    Assert.assertEquals(HostRoleStatus.ABORTED, 
allStages.get(2).getHostRoleStatus(hostname, "DATANODE"));
+
     Assert.assertEquals(aq.size(hostname), 1); // Cancel commands should be 
generated only for 1 stage
 
     CancelCommand cancelCommand = (CancelCommand) aq.dequeue(hostname);
@@ -2438,7 +2511,8 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock));
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
 
     
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
@@ -2498,7 +2572,8 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock);
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, 
null, RoleCommand.EXECUTE);
     hrc1.setStatus(HostRoleStatus.COMPLETED);
@@ -2530,7 +2605,8 @@ public class TestActionScheduler {
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        unitOfWork, null, conf, entityManagerProviderMock);
+        unitOfWork, null, conf, entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     HostRoleCommand hrc1 = hostRoleCommandFactory.create("h1", Role.NAMENODE, 
null, RoleCommand.EXECUTE);
     hrc1.setStatus(HostRoleStatus.COMPLETED);
@@ -2669,7 +2745,8 @@ public class TestActionScheduler {
     }).when(db).abortOperation(anyLong());
 
     ActionScheduler scheduler = spy(new ActionScheduler(100, 50, db, aq, fsm, 
3,
-        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock));
+        new HostsMap((String) null), unitOfWork, null, conf, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null));
 
     
doReturn(false).when(scheduler).wasAgentRestartedDuringOperation(any(Host.class),
 any(Stage.class), anyString());
 
@@ -2716,7 +2793,8 @@ public class TestActionScheduler {
     expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, 
null, null, 3,
-        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock);
+        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock,
+        (HostRoleCommandDAO)null, (HostRoleCommandFactory)null);
 
     replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand);
 
@@ -2735,7 +2813,7 @@ public class TestActionScheduler {
     expect(nextStage.getStageId()).andReturn(0L);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, null, null, null, 
3,
-        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock);
+        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock, null, null);
 
     replay(nextStage);
 
@@ -2773,7 +2851,7 @@ public class TestActionScheduler {
     expect(previousStage.getSuccessFactor(Role.DATANODE)).andReturn(0.5F);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, actionDBAccessor, 
null, null, 3,
-        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock);
+        new HostsMap((String) null), null, null, null, 
entityManagerProviderMock, null, null);
 
     replay(previousStage, nextStage, actionDBAccessor, hostRoleCommand);
 

Reply via email to