AMBARI-19530. Deploy Job failed saying "Command aborted. Reason: 'Stage timeout'". (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3283d9f2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3283d9f2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3283d9f2 Branch: refs/heads/trunk Commit: 3283d9f20c35686d41edf7f9bb0430c8dd77eb24 Parents: fb50d88 Author: Myroslav Papirkovskyi <[email protected]> Authored: Fri Jan 13 17:45:32 2017 +0200 Committer: Myroslav Papirkovskyi <[email protected]> Committed: Fri Jan 13 18:15:03 2017 +0200 ---------------------------------------------------------------------- .../server/actionmanager/ActionScheduler.java | 18 ++++++++---- .../apache/ambari/server/agent/ActionQueue.java | 31 +++++++++++++++++++- .../actionmanager/TestActionScheduler.java | 5 +++- 3 files changed, 46 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index e80b020..dabcb98 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -40,6 +40,7 @@ import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.ServiceComponentHostNotFoundException; import org.apache.ambari.server.ServiceComponentNotFoundException; import org.apache.ambari.server.agent.ActionQueue; +import org.apache.ambari.server.agent.AgentCommand; import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; import org.apache.ambari.server.agent.CancelCommand; import org.apache.ambari.server.agent.CommandReport; @@ -78,6 +79,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Multimap; import com.google.common.eventbus.Subscribe; import com.google.common.reflect.TypeToken; import com.google.inject.Inject; @@ -403,8 +405,10 @@ class ActionScheduler implements Runnable { // Commands that will be scheduled in current scheduler wakeup List<ExecutionCommand> commandsToSchedule = new ArrayList<>(); + Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); - Map<String, RoleStats> roleStats = processInProgressStage(stage, commandsToSchedule); + Map<String, RoleStats> roleStats = + processInProgressStage(stage, commandsToSchedule, commandsToEnqueue); // Check if stage is failed boolean failed = false; @@ -503,9 +507,10 @@ class ActionScheduler implements Runnable { if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) { serverActionExecutor.awake(); } else { - actionQueue.enqueue(cmd.getHostname(), cmd); + commandsToEnqueue.put(cmd.getHostname(), cmd); } } + actionQueue.enqueueAll(commandsToEnqueue.asMap()); LOG.debug("==> Finished."); if (!configuration.getParallelStageExecution()) { // If disabled @@ -698,7 +703,8 @@ class ActionScheduler implements Runnable { * @return the stats for the roles in the stage which are used to determine * whether stage has succeeded or failed */ - protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule) throws AmbariException { + protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule, + Multimap<String, AgentCommand> commandsToEnqueue) throws AmbariException { LOG.debug("==> Collecting commands to schedule..."); // Map to track role status Map<String, RoleStats> roleStats = initRoleStats(s); @@ -830,7 +836,7 @@ class ActionScheduler implements Runnable { LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId()); actionQueue.dequeue(host, c.getCommandId()); } else { - cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr))); + cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue); LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host, roleStr, s.getActionId()); @@ -1238,7 +1244,7 @@ class ActionScheduler implements Runnable { } } - void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) { + void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, Multimap<String, AgentCommand> commandsToEnqueue) { for (HostRoleCommand hostRoleCommand : hostRoleCommands) { // There are no server actions in actionQueue if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) { @@ -1247,7 +1253,7 @@ class ActionScheduler implements Runnable { CancelCommand cancelCommand = new CancelCommand(); cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId()); cancelCommand.setReason("Stage timeout"); - actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand); + commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java index 6c7803e..d6d5a35 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java @@ -18,10 +18,12 @@ package org.apache.ambari.server.agent; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -59,6 +61,24 @@ public class ActionQueue { * @throws NullPointerException - if hostname is {@code}null{@code} */ public void enqueue(String hostname, AgentCommand cmd) { + Queue<AgentCommand> q = getHostQueue(hostname); + + q.add(cmd); + } + + /** + * Adds commands to queue (atomically) for given hostname + * @param hostname - hostname of node + * @param commands - list of commands to add to queue + * @throws NullPointerException - if hostname is {@code}null{@code} + */ + public void enqueue(String hostname, Collection<AgentCommand> commands) { + Queue<AgentCommand> q = getHostQueue(hostname); + + q.addAll(commands); + } + + private Queue<AgentCommand> getHostQueue(String hostname) { Queue<AgentCommand> q = getQueue(hostname); if (q == null) { @@ -70,8 +90,17 @@ public class ActionQueue { } //otherwise we got existing queue (and put nothing!) } + return q; + } - q.add(cmd); + /** + * Adds command map to queue + * @param commandMap - map with hostname as key and command list as value + */ + public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) { + for (Map.Entry<String, Collection<AgentCommand>> entry : commandMap.entrySet()) { + enqueue(entry.getKey(), entry.getValue()); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/3283d9f2/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java index d73a3db..6cc511e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java @@ -106,6 +106,8 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import com.google.common.reflect.TypeToken; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -935,6 +937,7 @@ public class TestActionScheduler { aq.enqueue(Stage.INTERNAL_HOSTNAME, s.getExecutionCommands(null).get(0).getExecutionCommand()); List<ExecutionCommand> commandsToSchedule = new ArrayList<ExecutionCommand>(); + Multimap<String, AgentCommand> commandsToEnqueue = ArrayListMultimap.create(); boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask; db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), EasyMock.anyLong(), @@ -953,7 +956,7 @@ public class TestActionScheduler { EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost); - scheduler.processInProgressStage(s, commandsToSchedule); + scheduler.processInProgressStage(s, commandsToSchedule, commandsToEnqueue); EasyMock.verify(scheduler, fsm, host, db, cluster, ambariEventPublisher, service, serviceComponent, serviceComponentHost);
