Updated Branches: refs/heads/trunk 5363f8416 -> f792d3d3f
AMBARI-4403. Fill start and end time of request entity. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f792d3d3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f792d3d3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f792d3d3 Branch: refs/heads/trunk Commit: f792d3d3f6651d918ee5d201f1057f1c2bc65a3a Parents: 5363f84 Author: Myroslav Papirkovskyy <[email protected]> Authored: Thu Jan 23 02:19:10 2014 +0200 Committer: Myroslav Papirkovskyy <[email protected]> Committed: Fri Jan 24 00:55:53 2014 +0200 ---------------------------------------------------------------------- .../server/actionmanager/ActionDBAccessor.java | 14 +- .../actionmanager/ActionDBAccessorImpl.java | 139 ++++++++++++++----- .../server/actionmanager/ActionManager.java | 12 +- .../server/actionmanager/ActionScheduler.java | 11 +- .../server/actionmanager/HostRoleStatus.java | 45 +++--- .../ambari/server/actionmanager/Request.java | 10 ++ .../server/orm/dao/HostRoleCommandDAO.java | 27 +++- .../ambari/server/orm/dao/RequestDAO.java | 20 +++ .../server/orm/entities/RequestEntity.java | 21 ++- .../server/actionmanager/TestActionManager.java | 28 ++-- .../actionmanager/TestActionScheduler.java | 63 ++++++--- 11 files changed, 280 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index c29f6f1..3dfdf66 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -18,9 +18,7 @@ package org.apache.ambari.server.actionmanager; import com.google.inject.persist.Transactional; -import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; -import org.apache.ambari.server.controller.ExecuteActionRequest; import java.util.Collection; import java.util.List; @@ -40,6 +38,13 @@ public interface ActionDBAccessor { public List<Stage> getAllStages(long requestId); /** + * Get request object by id + * @param requestId + * @return + */ + Request getRequest(long requestId); + + /** * Abort all outstanding operations associated with the given request */ public void abortOperation(long requestId); @@ -75,6 +80,11 @@ public interface ActionDBAccessor { void setSourceScheduleForRequest(long requestId, long scheduleId); /** + * Update tasks according to command reports + */ + void updateHostRoleStates(Collection<CommandReport> reports); + + /** * For the given host, update all the tasks based on the command report */ public void updateHostRoleState(String hostname, long requestId, http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index bc7ea8f..96a3a0e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -20,21 +20,41 @@ package org.apache.ambari.server.actionmanager; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.inject.Inject; -import com.google.inject.Injector; import com.google.inject.Singleton; import com.google.inject.name.Named; import com.google.inject.persist.Transactional; -import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.agent.CommandReport; -import org.apache.ambari.server.controller.ExecuteActionRequest; -import org.apache.ambari.server.orm.dao.*; -import org.apache.ambari.server.orm.entities.*; -import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.orm.dao.ClusterDAO; +import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; +import org.apache.ambari.server.orm.dao.HostDAO; +import org.apache.ambari.server.orm.dao.HostRoleCommandDAO; +import org.apache.ambari.server.orm.dao.RequestDAO; +import org.apache.ambari.server.orm.dao.RequestScheduleDAO; +import org.apache.ambari.server.orm.dao.RoleSuccessCriteriaDAO; +import org.apache.ambari.server.orm.dao.StageDAO; +import org.apache.ambari.server.orm.entities.ClusterEntity; +import org.apache.ambari.server.orm.entities.ExecutionCommandEntity; +import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; +import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.RequestScheduleEntity; +import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity; +import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; @Singleton @@ -107,20 +127,24 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { return stages; } + @Override + public Request getRequest(long requestId) { + RequestEntity requestEntity = requestDAO.findByPK(requestId); + if (requestEntity != null) { + return requestFactory.createExisting(requestEntity); + } else { + return null; + } + } + /* (non-Javadoc) * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long) */ @Override - @Transactional public void abortOperation(long requestId) { long now = System.currentTimeMillis(); - //mark request as ended - RequestEntity requestEntity = requestDAO.findByPK(requestId); - if (requestEntity != null && requestEntity.getEndTime() == -1L) { - requestEntity.setEndTime(now); - requestDAO.merge(requestEntity); - } + endRequest(requestId); List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByRequest(requestId); @@ -130,7 +154,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { command.getStatus() == HostRoleStatus.PENDING) { command.setStatus(HostRoleStatus.ABORTED); command.setEndTime(now); - hostRoleCommandDAO.merge(command); LOG.info("Aborting command. Hostname " + command.getHostName() + " role " + command.getRole() + " requestId " + command.getRequestId() @@ -138,13 +161,14 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { + " stageId " + command.getStageId()); } } + + hostRoleCommandDAO.mergeAll(commands); } /* (non-Javadoc) * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#timeoutHostRole(long, long, org.apache.ambari.server.Role) */ @Override - @Transactional public void timeoutHostRole(String host, long requestId, long stageId, String role) { long now = System.currentTimeMillis(); @@ -153,8 +177,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { for (HostRoleCommandEntity command : commands) { command.setStatus(HostRoleStatus.TIMEDOUT); command.setEndTime(now); - hostRoleCommandDAO.merge(command); } + hostRoleCommandDAO.mergeAll(commands); + endRequestIfCompleted(requestId); } /* (non-Javadoc) @@ -165,7 +190,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { List<Stage> stages = new ArrayList<Stage>(); List<HostRoleStatus> statuses = Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, - HostRoleStatus.PENDING); + HostRoleStatus.PENDING); for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) { stages.add(stageFactory.createExisting(stageEntity)); } @@ -234,27 +259,30 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } requestEntity.setStages(stageEntities); requestDAO.merge(requestEntity); -// requestDAO.create(requestEntity); } @Override - @Transactional public void startRequest(long requestId) { RequestEntity requestEntity = requestDAO.findByPK(requestId); if (requestEntity != null && requestEntity.getStartTime() == -1L) { requestEntity.setStartTime(System.currentTimeMillis()); + requestDAO.merge(requestEntity); } - requestDAO.merge(requestEntity); } @Override - @Transactional public void endRequest(long requestId) { RequestEntity requestEntity = requestDAO.findByPK(requestId); if (requestEntity != null && requestEntity.getEndTime() == -1L) { requestEntity.setEndTime(System.currentTimeMillis()); + requestDAO.merge(requestEntity); + } + } + + public void endRequestIfCompleted(long requestId) { + if (requestDAO.isAllTasksCompleted(requestId)) { + endRequest(requestId); } - requestDAO.merge(requestEntity); } @Override @@ -285,30 +313,76 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } @Override - @Transactional + public void updateHostRoleStates(Collection<CommandReport> reports) { + Map<Long, CommandReport> taskReports = new HashMap<Long, CommandReport>(); + for (CommandReport report : reports) { + taskReports.put(report.getTaskId(), report); + } + + long now = System.currentTimeMillis(); + + List<Long> requestsToCheck = new ArrayList<Long>(); + + List<HostRoleCommandEntity> commandEntities = hostRoleCommandDAO.findByPKs(taskReports.keySet()); + for (HostRoleCommandEntity commandEntity : commandEntities) { + CommandReport report = taskReports.get(commandEntity.getTaskId()); + commandEntity.setStatus(HostRoleStatus.valueOf(report.getStatus())); + commandEntity.setStdOut(report.getStdOut().getBytes()); + commandEntity.setStdError(report.getStdErr().getBytes()); + commandEntity.setStructuredOut(report.getStructuredOut() == null ? null : + report.getStructuredOut().getBytes()); + commandEntity.setExitcode(report.getExitCode()); + + if (HostRoleStatus.getCompletedStates().contains(commandEntity.getStatus())) { + commandEntity.setEndTime(now); + + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + long requestId = requestStageIds[0]; + long stageId = requestStageIds[1]; + if (requestDAO.getLastStageId(requestId).equals(stageId)) { + requestsToCheck.add(requestId); + } + } + } + + hostRoleCommandDAO.mergeAll(commandEntities); + + for (Long requestId : requestsToCheck) { + endRequestIfCompleted(requestId); + } + } + + @Override public void updateHostRoleState(String hostname, long requestId, long stageId, String role, CommandReport report) { + boolean checkRequest = false; if (LOG.isDebugEnabled()) { LOG.debug("Update HostRoleState: " - + "HostName " + hostname + " requestId " + requestId + " stageId " - + stageId + " role " + role + " report " + report); + + "HostName " + hostname + " requestId " + requestId + " stageId " + + stageId + " role " + role + " report " + report); } long now = System.currentTimeMillis(); List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole( - hostname, requestId, stageId, role); + hostname, requestId, stageId, role); for (HostRoleCommandEntity command : commands) { command.setStatus(HostRoleStatus.valueOf(report.getStatus())); command.setStdOut(report.getStdOut().getBytes()); command.setStdError(report.getStdErr().getBytes()); command.setStructuredOut(report.getStructuredOut() == null ? null : - report.getStructuredOut().getBytes()); // =================================== - if (command.getStatus() == HostRoleStatus.COMPLETED || - command.getStatus() == HostRoleStatus.ABORTED || - command.getStatus() == HostRoleStatus.FAILED) { + report.getStructuredOut().getBytes()); + if (HostRoleStatus.getCompletedStates().contains(command.getStatus())) { command.setEndTime(now); + if (requestDAO.getLastStageId(requestId).equals(stageId)) { + checkRequest = true; + } } command.setExitcode(report.getExitCode()); - hostRoleCommandDAO.merge(command); + } + hostRoleCommandDAO.mergeAll(commands); + + if (checkRequest) { + endRequestIfCompleted(requestId); } } @@ -473,7 +547,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { } @Override - @Transactional public List<Request> getRequests(Collection<Long> requestIds){ List<RequestEntity> requestEntities = requestDAO.findByPks(requestIds); List<Request> requests = new ArrayList<Request>(requestEntities.size()); http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java index 3788d75..789bbd6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java @@ -33,6 +33,7 @@ import org.apache.ambari.server.utils.StageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -123,15 +124,13 @@ public class ActionManager { if (reports == null) { return; } + + List<CommandReport> reportsToProcess = new ArrayList<CommandReport>(); //persist the action response into the db. for (CommandReport report : reports) { if (LOG.isDebugEnabled()) { LOG.debug("Processing command report : " + report.toString()); } - String actionId = report.getActionId(); - long[] requestStageIds = StageUtils.getRequestStage(actionId); - long requestId = requestStageIds[0]; - long stageId = requestStageIds[1]; HostRoleCommand command = db.getTask(report.getTaskId()); if (command == null) { LOG.warn("The task " + report.getTaskId() @@ -144,9 +143,10 @@ public class ActionManager { + " is not in progress, ignoring update"); continue; } - db.updateHostRoleState(hostname, requestId, stageId, report.getRole(), - report); + reportsToProcess.add(report); } + + db.updateHostRoleStates(reportsToProcess); } public void handleLostHost(String host) { http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 259b970..601930d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -79,6 +79,8 @@ class ActionScheduler implements Runnable { private final ServerActionManager serverActionManager; private final Configuration configuration; + private final Set<String> requestsInProgress = new HashSet<String>(); + /** * true if scheduler should run ASAP. * We need this flag to avoid sleep in situations, when @@ -145,8 +147,10 @@ class ActionScheduler implements Runnable { shouldRun = false; } catch (Exception ex) { LOG.warn("Exception received", ex); + requestsInProgress.clear(); } catch (Throwable t) { LOG.warn("ERROR", t); + requestsInProgress.clear(); } } } @@ -179,9 +183,12 @@ class ActionScheduler implements Runnable { continue; } else { runningRequestIds.add(requestIdStr); + if (!requestsInProgress.contains(requestIdStr)) { + requestsInProgress.add(requestIdStr); + db.startRequest(requestId); + } } - List<String> stageHosts = s.getHosts(); boolean conflict = false; for (String host : stageHosts) { @@ -251,6 +258,8 @@ class ActionScheduler implements Runnable { } } + requestsInProgress.retainAll(runningRequestIds); + } finally { unitOfWork.end(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java index 84b8f93..039579f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java @@ -17,6 +17,10 @@ */ package org.apache.ambari.server.actionmanager; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + public enum HostRoleStatus { PENDING(0), //Not queued for a host QUEUED(1), //Queued for a host @@ -27,6 +31,10 @@ public enum HostRoleStatus { ABORTED(6); //Operation was abandoned private final int status; + private static List<HostRoleStatus> COMPLETED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED, COMPLETED); + private static List<HostRoleStatus> FAILED_STATES = Arrays.asList(FAILED, TIMEDOUT, ABORTED); + + private HostRoleStatus(int status) { this.status = status; } @@ -37,14 +45,7 @@ public enum HostRoleStatus { * @return true if this is a valid failure state. */ public boolean isFailedState() { - switch (HostRoleStatus.values()[this.status]) { - case FAILED: - case TIMEDOUT: - case ABORTED: - return true; - default: - return false; - } + return FAILED_STATES.contains(this); } /** @@ -56,14 +57,24 @@ public enum HostRoleStatus { * @return true if this is a completed state. */ public boolean isCompletedState() { - switch (HostRoleStatus.values()[this.status]) { - case COMPLETED: - case FAILED: - case TIMEDOUT: - case ABORTED: - return true; - default: - return false; - } + return COMPLETED_STATES.contains(this); + } + + /** + * + * @return list of completed states + */ + public static List<HostRoleStatus> getCompletedStates() { + return Collections.unmodifiableList(COMPLETED_STATES); } + + /** + * + * @return list of failed states + */ + public static List<HostRoleStatus> getFailedStates() { + return Collections.unmodifiableList(FAILED_STATES); + } + + } http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java index 5116ea9..d1047a7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Request.java @@ -45,6 +45,7 @@ public class Request { private long createTime; private long startTime; private long endTime; + private HostRoleStatus status; // not persisted yet private String inputs; private String targetService; private String targetComponent; @@ -142,6 +143,7 @@ public class Request { this.targetHosts = entity.getTargetHosts(); this.requestType = entity.getRequestType(); this.commandName = entity.getCommandName(); + this.status = entity.getStatus(); if (entity.getRequestScheduleEntity() !=null) { this.requestScheduleId = entity.getRequestScheduleEntity().getScheduleId(); } @@ -303,4 +305,12 @@ public class Request { ", stages=" + stages + '}'; } + + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index e68b974..61e2fc2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -22,7 +22,6 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; -import org.apache.ambari.server.Role; import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.orm.entities.HostEntity; import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; @@ -31,10 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.persistence.EntityManager; -import javax.persistence.Query; import javax.persistence.TypedQuery; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @Singleton public class HostRoleCommandDAO { @@ -53,10 +55,13 @@ public class HostRoleCommandDAO { @Transactional public List<HostRoleCommandEntity> findByPKs(Collection<Long> taskIds) { + if (taskIds == null || taskIds.isEmpty()) { + return Collections.emptyList(); + } TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery( - "SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " + - "ORDER BY task.taskId", - HostRoleCommandEntity.class); + "SELECT task FROM HostRoleCommandEntity task WHERE task.taskId IN ?1 " + + "ORDER BY task.taskId", + HostRoleCommandEntity.class); return daoUtils.selectList(query, taskIds); } @@ -204,7 +209,15 @@ public class HostRoleCommandDAO { public HostRoleCommandEntity merge(HostRoleCommandEntity stageEntity) { HostRoleCommandEntity entity = entityManagerProvider.get().merge(stageEntity); return entity; + } + @Transactional + public List<HostRoleCommandEntity> mergeAll(Collection<HostRoleCommandEntity> entities) { + List<HostRoleCommandEntity> managedList = new ArrayList<HostRoleCommandEntity>(entities.size()); + for (HostRoleCommandEntity entity : entities) { + managedList.add(entityManagerProvider.get().merge(entity)); + } + return managedList; } @Transactional http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 483550f..7a2c836 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -21,6 +21,7 @@ package org.apache.ambari.server.orm.dao; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.persist.Transactional; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.orm.entities.RequestEntity; import javax.persistence.EntityManager; @@ -47,6 +48,25 @@ public class RequestDAO { } @Transactional + public boolean isAllTasksCompleted(long requestId) { + TypedQuery<Long> query = entityManagerProvider.get().createQuery( + "SELECT task.taskId FROM HostRoleCommandEntity task WHERE task.requestId = ?1 AND " + + "task.stageId=(select max(stage.stageId) FROM StageEntity stage WHERE stage.requestId=?1) " + + "AND task.status NOT IN ?2", + Long.class + ); + query.setMaxResults(1); //we don't need all + return daoUtils.selectList(query, requestId, HostRoleStatus.getCompletedStates()).isEmpty(); + } + + @Transactional + public Long getLastStageId(long requestId) { + TypedQuery<Long> query = entityManagerProvider.get().createQuery("SELECT max(stage.stageId) " + + "FROM StageEntity stage WHERE stage.requestId=?1", Long.class); + return daoUtils.selectSingle(query, requestId); + } + + @Transactional public void create(RequestEntity requestEntity) { entityManagerProvider.get().persist(requestEntity); } http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java index 1fe763b..072b4ed 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java @@ -18,9 +18,21 @@ package org.apache.ambari.server.orm.entities; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.actionmanager.RequestType; -import javax.persistence.*; +import javax.persistence.Basic; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.Lob; +import javax.persistence.ManyToOne; +import javax.persistence.OneToMany; +import javax.persistence.Table; import java.util.Collection; @Table(name = "request") @@ -68,7 +80,8 @@ public class RequestEntity { private RequestType requestType; @Column(name = "status") - private String status; + @Enumerated(value = EnumType.STRING) + private HostRoleStatus status; @Basic @Column(name = "create_time", nullable = false) @@ -205,11 +218,11 @@ public class RequestEntity { this.commandName = commandName; } - public String getStatus() { + public HostRoleStatus getStatus() { return status; } - public void setStatus(String status) { + public void setStatus(HostRoleStatus status) { this.status = status; } http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java index b749ea0..9cb2199 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java @@ -17,21 +17,11 @@ */ package org.apache.ambari.server.actionmanager; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.createStrictMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.persist.PersistService; import com.google.inject.persist.UnitOfWork; import junit.framework.Assert; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -40,7 +30,6 @@ import org.apache.ambari.server.agent.CommandReport; import org.apache.ambari.server.controller.HostsMap; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; -import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent; import org.apache.ambari.server.utils.StageUtils; @@ -49,9 +38,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.persist.PersistService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.*; public class TestActionManager { @@ -119,6 +111,8 @@ public class TestActionManager { "STRUCTURED_OUTPUT", am.getAction(requestId, stageId) .getHostRoleCommand(hostname, "HBASE_MASTER").getStructuredOut()); + + assertFalse(db.getRequest(requestId).getEndTime() == -1); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/f792d3d3/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index 696d2a6..de6c98b 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -23,9 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.lang.reflect.Type; import java.util.ArrayList; @@ -216,6 +214,9 @@ public class TestActionScheduler { assertEquals(stages.get(0).getHostRoleStatus(hostname, "NAMENODE"), HostRoleStatus.TIMEDOUT); + verify(db, times(1)).startRequest(eq(1L)); + verify(db, times(1)).abortOperation(1L); + scheduler.stop(); } @@ -587,21 +588,29 @@ public class TestActionScheduler { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - String host = (String) invocation.getArguments()[0]; - Long requestId = (Long) invocation.getArguments()[1]; - Long stageId = (Long) invocation.getArguments()[2]; - String role = (String) invocation.getArguments()[3]; - CommandReport commandReport = (CommandReport) invocation.getArguments()[4]; - for (Stage stage : stages) { - if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { - HostRoleCommand command = stage.getHostRoleCommand(host, role); - command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus())); + List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0]; + for (CommandReport report : reports) { + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + Long requestId = requestStageIds[0]; + Long stageId = requestStageIds[1]; + String role = report.getRole(); + Long id = report.getTaskId(); + for (Stage stage : stages) { + if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { + for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) { + if (hostRoleCommand.getTaskId() == id) { + hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus())); + } + } + } } + } return null; } - }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class)); when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() { @Override @@ -906,21 +915,29 @@ public class TestActionScheduler { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - String host = (String) invocation.getArguments()[0]; - Long requestId = (Long) invocation.getArguments()[1]; - Long stageId = (Long) invocation.getArguments()[2]; - String role = (String) invocation.getArguments()[3]; - CommandReport commandReport = (CommandReport) invocation.getArguments()[4]; - for (Stage stage : stages) { - if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { - HostRoleCommand command = stage.getHostRoleCommand(host, role); - command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus())); + List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0]; + for (CommandReport report : reports) { + String actionId = report.getActionId(); + long[] requestStageIds = StageUtils.getRequestStage(actionId); + Long requestId = requestStageIds[0]; + Long stageId = requestStageIds[1]; + String role = report.getRole(); + Long id = report.getTaskId(); + for (Stage stage : stages) { + if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) { + for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) { + if (hostRoleCommand.getTaskId() == id) { + hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus())); + } + } + } } + } return null; } - }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class)); + }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class)); when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() { @Override
