Repository: ambari Updated Branches: refs/heads/trunk a384a0e1c -> 949ecd21b
AMBARI-5761. 2000-node cluster testing: during install phase of cluster deployment, install tasks were stuck in PENDING state. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/949ecd21 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/949ecd21 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/949ecd21 Branch: refs/heads/trunk Commit: 949ecd21b4001ccd763ad82985ec6efd96eef6ff Parents: a384a0e Author: Myroslav Papirkovskyy <[email protected]> Authored: Thu May 29 18:13:52 2014 +0300 Committer: Myroslav Papirkovskyy <[email protected]> Committed: Thu May 29 18:15:40 2014 +0300 ---------------------------------------------------------------------- .../server/actionmanager/ActionDBAccessor.java | 11 ++ .../actionmanager/ActionDBAccessorImpl.java | 18 +++ .../server/actionmanager/ActionScheduler.java | 131 +++++++++++++++++-- .../AmbariManagementControllerImpl.java | 1 + .../internal/RequestResourceProvider.java | 7 + .../org/apache/ambari/server/state/Cluster.java | 11 ++ .../server/state/cluster/ClusterImpl.java | 33 +++++ 7 files changed, 200 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java index 86ebecf..fd3e039 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager; import com.google.inject.persist.Transactional; import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; import java.util.Collection; import java.util.List; @@ -111,6 +112,16 @@ public interface ActionDBAccessor { public long getLastPersistedRequestIdWhenInitialized(); /** + * Bulk update scheduled commands + */ + void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands); + + /** + * Bulk abort commands + */ + void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands); + + /** * Updates scheduled stage. */ public void hostRoleScheduled(Stage s, String hostname, String roleStr); http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 7089276..375794d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -24,6 +24,7 @@ import com.google.inject.Singleton; import com.google.inject.name.Named; import com.google.inject.persist.Transactional; import org.apache.ambari.server.agent.CommandReport; +import org.apache.ambari.server.agent.ExecutionCommand; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; import org.apache.ambari.server.orm.dao.HostDAO; @@ -411,6 +412,23 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { return requestId; } + + @Override + @Transactional + public void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands) { + for (ExecutionCommand command : commands) { + hostRoleScheduled(s, command.getHostname(), command.getRole()); + } + } + + @Override + @Transactional + public void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands) { + for (ExecutionCommand command : commands) { + abortHostRole(command.getHostname(), s.getRequestId(), s.getStageId(), command.getRole()); + } + } + @Override @Transactional public void hostRoleScheduled(Stage s, String hostname, String roleStr) { http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index bf9ec79..2030163 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -21,12 +21,20 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.reflect.TypeToken; +import com.google.inject.persist.UnitOfWork; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -46,16 +54,16 @@ import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostEvent; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent; import org.apache.ambari.server.utils.StageUtils; +import org.apache.commons.collections.MultiMap; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.reflect.TypeToken; -import com.google.inject.persist.UnitOfWork; + /** * This class encapsulates the action scheduler thread. @@ -164,6 +172,7 @@ class ActionScheduler implements Runnable { List<Stage> stages = db.getStagesInProgress(); if (LOG.isDebugEnabled()) { LOG.debug("Scheduler wakes up"); + LOG.debug("Processing {} in progress stages ", stages.size()); } if (stages == null || stages.isEmpty()) { //Nothing to do @@ -172,15 +181,18 @@ class ActionScheduler implements Runnable { } return; } - + int i_stage = 0; for (Stage s : stages) { // Check if we can process this stage in parallel with another stages + i_stage ++; long requestId = s.getRequestId(); // Convert to string to avoid glitches with boxing/unboxing String requestIdStr = String.valueOf(requestId); + LOG.debug("==> STAGE_i = " + i_stage + "(requestId=" + requestIdStr + ",StageId=" + s.getStageId() + ")"); if (runningRequestIds.contains(requestIdStr)) { // We don't want to process different stages from the same request in parallel + LOG.info("==> We don't want to process different stages from the same request in parallel" ); continue; } else { runningRequestIds.add(requestIdStr); @@ -233,7 +245,11 @@ class ActionScheduler implements Runnable { return; } + List<ExecutionCommand> commandsToStart = new ArrayList<ExecutionCommand>(); + List<ExecutionCommand> commandsToUpdate = new ArrayList<ExecutionCommand>(); + //Schedule what we have so far + for (ExecutionCommand cmd : commandsToSchedule) { if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) { /** @@ -245,15 +261,46 @@ class ActionScheduler implements Runnable { */ executeServerAction(s, cmd); } else { - try { - scheduleHostRole(s, cmd); - } catch (InvalidStateTransitionException e) { - LOG.warn("Could not schedule host role " + cmd.toString(), e); - db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole()); + processHostRole(s, cmd, commandsToStart, commandsToUpdate); + } + } + + LOG.debug("==> Commands to start: {}", commandsToStart.size()); + LOG.debug("==> Commands to update: {}", commandsToUpdate.size()); + + //Multimap is analog of Map<Object, List<Object>> but allows to avoid nested loop + ListMultimap<String, ServiceComponentHostEvent> eventMap = formEventMap(s, commandsToStart); + LOG.debug("==> processing {} serviceComponentHostEvents...", eventMap.size()); + List<ServiceComponentHostEvent> failedEvents = + fsmObject.getCluster(s.getClusterName()).processServiceComponentHostEvents(eventMap); + LOG.debug("==> {} events failed.", failedEvents.size()); + + List<ExecutionCommand> commandsToAbort = new ArrayList<ExecutionCommand>(); + + for (Iterator<ExecutionCommand> iterator = commandsToUpdate.iterator(); iterator.hasNext(); ) { + ExecutionCommand cmd = iterator.next(); + for (ServiceComponentHostEvent event : failedEvents) { + if (StringUtils.equals(event.getHostName(), cmd.getHostname()) && + StringUtils.equals(event.getServiceComponentName(), cmd.getRole())) { + iterator.remove(); + commandsToAbort.add(cmd); + break; } } } + LOG.debug("==> Scheduling {} tasks...", commandsToUpdate.size()); + db.bulkHostRoleScheduled(s, commandsToUpdate); + + LOG.debug("==> Aborting {} tasks...", commandsToAbort.size()); + db.bulkAbortHostRole(s, commandsToAbort); + + LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size()); + for (ExecutionCommand cmd : commandsToUpdate) { + actionQueue.enqueue(cmd.getHostname(), cmd); + } + LOG.debug("==> Finished."); + if (! configuration.getParallelStageExecution()) { // If disabled return; } @@ -262,6 +309,7 @@ class ActionScheduler implements Runnable { requestsInProgress.retainAll(runningRequestIds); } finally { + LOG.debug("Scheduler finished work."); unitOfWork.end(); } } @@ -272,6 +320,8 @@ class ActionScheduler implements Runnable { */ private void executeServerAction(Stage s, ExecutionCommand cmd) { try { + LOG.trace("Executing server action: request_id={}, stage_id={}, task_id={}", + s.getRequestId(), s.getStageId(), cmd.getTaskId()); long now = System.currentTimeMillis(); String hostName = cmd.getHostname(); String roleName = cmd.getRole(); @@ -368,6 +418,7 @@ class ActionScheduler implements Runnable { */ private Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule) throws AmbariException { + LOG.debug("==> Collecting commands to schedule..."); // Map to track role status Map<String, RoleStats> roleStats = initRoleStats(s); long now = System.currentTimeMillis(); @@ -384,12 +435,17 @@ class ActionScheduler implements Runnable { for (String host : s.getHosts()) { List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host); Host hostObj = fsmObject.getHost(host); - + int i_my = 0; + LOG.trace("===>host=" + host); for(ExecutionCommandWrapper wrapper : commandWrappers) { ExecutionCommand c = wrapper.getExecutionCommand(); String roleStr = c.getRole(); HostRoleStatus status = s.getHostRoleStatus(host, roleStr); - + i_my ++; + if (LOG.isTraceEnabled()) { + LOG.trace("Host task " + i_my + ") id = " + c.getTaskId() + " status = " + status.toString() + + " (role=" + roleStr + "), roleCommand = "+ c.getRoleCommand()); + } boolean hostDeleted = false; if (null != cluster) { Service svc = null; @@ -451,19 +507,23 @@ class ActionScheduler implements Runnable { } // Dequeue command + LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId()); actionQueue.dequeue(host, c.getCommandId()); } else { // reschedule command commandsToSchedule.add(c); + LOG.trace("===> commandsToSchedule(reschedule)=" + commandsToSchedule.size()); } } else if (status.equals(HostRoleStatus.PENDING)) { //Need to schedule first time commandsToSchedule.add(c); + LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size()); } this.updateRoleStats(status, roleStats.get(roleStr)); } } + LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size()); return roleStats; } @@ -581,6 +641,53 @@ class ActionScheduler implements Runnable { return false; } + private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage s, List<ExecutionCommand> commands) { + ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = ArrayListMultimap.create(); + for (ExecutionCommand cmd : commands) { + String hostname = cmd.getHostname(); + String roleStr = cmd.getRole(); + if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) { + serviceEventMap.put(cmd.getServiceName(), s.getFsmEvent(hostname, roleStr).getEvent()); + } + } + return serviceEventMap; + } + + private void processHostRole(Stage s, ExecutionCommand cmd, List<ExecutionCommand> commandsToStart, + List<ExecutionCommand> commandsToUpdate) + throws AmbariException { + long now = System.currentTimeMillis(); + String roleStr = cmd.getRole(); + String hostname = cmd.getHostname(); + + // start time is -1 if host role command is not started yet + if (s.getStartTime(hostname, roleStr) < 0) { + + commandsToStart.add(cmd); + s.setStartTime(hostname,roleStr, now); + s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED); + } + s.setLastAttemptTime(hostname, roleStr, now); + s.incrementAttemptCount(hostname, roleStr); + /** change the hostname in the command for the host itself **/ + cmd.setHostname(hostsMap.getHostMap(hostname)); + + + //Try to get clusterHostInfo from cache + String stagePk = s.getStageId() + "-" + s.getRequestId(); + Map<String, Set<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(stagePk); + + if (clusterHostInfo == null) { + Type type = new TypeToken<Map<String, Set<String>>>() {}.getType(); + clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type); + clusterHostInfoCache.put(stagePk, clusterHostInfo); + } + + cmd.setClusterHostInfo(clusterHostInfo); + + commandsToUpdate.add(cmd); + } + private void scheduleHostRole(Stage s, ExecutionCommand cmd) throws InvalidStateTransitionException, AmbariException { long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index a80327c..1297f6c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -1814,6 +1814,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle List<Stage> stages = doStageCreation(requestStages, cluster, changedServices, changedComponents, changedHosts, requestParameters, requestProperties, runSmokeTest, reconfigureClients); + LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0)); requestStages.addStages(stages); updateServiceStates(changedServices, changedComponents, changedHosts, ignoredHosts); http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java index fc1ec99..18f9d6b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java @@ -318,6 +318,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider List<Long> requestIds = actionManager.getRequestsByStatus(status, maxResults != null ? maxResults : BaseRequest.DEFAULT_PAGE_SIZE, ascOrder != null ? ascOrder : false); + LOG.debug("List<Long> requestIds = actionManager.getRequestsByStatus = {}", requestIds.size()); response.addAll(getRequestResources(clusterName, actionManager, requestIds, requestedPropertyIds)); @@ -340,6 +341,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider Set<String> requestedPropertyIds) { List<org.apache.ambari.server.actionmanager.Request> requests = actionManager.getRequests(requestIds); + LOG.debug("requests = actionManager.getRequests(requestIds)={}", requests.size()); Map<Long, Resource> resourceMap = new HashMap<Long, Resource>(); @@ -417,6 +419,11 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider int inProgressTaskCount = taskCount - completedTaskCount - queuedTaskCount - pendingTaskCount; + LOG.debug("taskCount={}, inProgressTaskCount={}, completedTaskCount={}, queuedTaskCount={}, " + + "pendingTaskCount={}, failedTaskCount={}, abortedTaskCount={}, timedOutTaskCount={}", + taskCount, inProgressTaskCount, completedTaskCount, queuedTaskCount, pendingTaskCount, + failedTaskCount, abortedTaskCount, timedOutTaskCount); + // determine request status HostRoleStatus requestStatus = failedTaskCount > 0 ? HostRoleStatus.FAILED : abortedTaskCount > 0 ? HostRoleStatus.ABORTED : http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java index c8460ef..e1a3e42 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReadWriteLock; +import com.google.common.collect.ListMultimap; +import com.google.inject.persist.Transactional; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ClusterResponse; import org.apache.ambari.server.state.configgroup.ConfigGroup; +import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.scheduler.RequestExecution; public interface Cluster { @@ -293,4 +296,12 @@ public interface Cluster { * @throws AmbariException */ public void deleteRequestExecution(Long id) throws AmbariException; + + /** + * Bulk handle service component host events + * + * @param eventMap serviceName - event mapping + * @return list of failed events + */ + List<ServiceComponentHostEvent> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap); } http://git-wip-us.apache.org/repos/asf/ambari/blob/949ecd21/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index f2020d1..706c375 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.persistence.RollbackException; +import com.google.common.collect.ListMultimap; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceNotFoundException; @@ -65,12 +66,14 @@ import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostEvent; import org.apache.ambari.server.state.ServiceFactory; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.State; import org.apache.ambari.server.state.ClusterHealthReport; import org.apache.ambari.server.state.configgroup.ConfigGroup; import org.apache.ambari.server.state.configgroup.ConfigGroupFactory; +import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; import org.slf4j.Logger; @@ -1426,6 +1429,36 @@ public class ClusterImpl implements Cluster { return getHostsDesiredConfigs(hostnames); } + @Transactional + @Override + public List<ServiceComponentHostEvent> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap) { + List<ServiceComponentHostEvent> failedEvents = new ArrayList<ServiceComponentHostEvent>(); + + clusterGlobalLock.readLock().lock(); + try { + for (Entry<String, ServiceComponentHostEvent> entry : eventMap.entries()) { + String serviceName = entry.getKey(); + ServiceComponentHostEvent event = entry.getValue(); + try { + Service service = getService(serviceName); + ServiceComponent serviceComponent = service.getServiceComponent(event.getServiceComponentName()); + ServiceComponentHost serviceComponentHost = serviceComponent.getServiceComponentHost(event.getHostName()); + serviceComponentHost.handleEvent(event); + } catch (AmbariException e) { + LOG.error("ServiceComponentHost lookup exception ", e); + failedEvents.add(event); + } catch (InvalidStateTransitionException e) { + LOG.error("Invalid transition ", e); + failedEvents.add(event); + } + } + } finally { + clusterGlobalLock.readLock().unlock(); + } + + return failedEvents; + } + private ClusterHealthReport getClusterHealthReport() throws AmbariException { int staleConfigsHosts = 0;
