Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 b117a56b1 -> 63f175515
  refs/heads/trunk fb50d88f1 -> 3283d9f20


AMBARI-19530. Deploy Job failed saying "Command aborted. Reason: 'Stage 
timeout'". (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/63f17551
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/63f17551
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/63f17551

Branch: refs/heads/branch-2.5
Commit: 63f17551512b70f554484670dfcc6ca0ebc5e3fa
Parents: b117a56
Author: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Authored: Fri Jan 13 17:45:32 2017 +0200
Committer: Myroslav Papirkovskyi <mpapyrkovs...@hortonworks.com>
Committed: Fri Jan 13 17:46:44 2017 +0200

----------------------------------------------------------------------
 .../server/actionmanager/ActionScheduler.java   | 18 ++++++++----
 .../apache/ambari/server/agent/ActionQueue.java | 31 +++++++++++++++++++-
 .../actionmanager/TestActionScheduler.java      |  5 +++-
 3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
index e80b020..dabcb98 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
@@ -40,6 +40,7 @@ import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand;
 import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.apache.ambari.server.agent.CancelCommand;
 import org.apache.ambari.server.agent.CommandReport;
@@ -78,6 +79,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.reflect.TypeToken;
 import com.google.inject.Inject;
@@ -403,8 +405,10 @@ class ActionScheduler implements Runnable {
 
         // Commands that will be scheduled in current scheduler wakeup
         List<ExecutionCommand> commandsToSchedule = new ArrayList<>();
+        Multimap<String, AgentCommand> commandsToEnqueue = 
ArrayListMultimap.create();
 
-        Map<String, RoleStats> roleStats = processInProgressStage(stage, 
commandsToSchedule);
+        Map<String, RoleStats> roleStats =
+          processInProgressStage(stage, commandsToSchedule, commandsToEnqueue);
 
         // Check if stage is failed
         boolean failed = false;
@@ -503,9 +507,10 @@ class ActionScheduler implements Runnable {
           if (Role.AMBARI_SERVER_ACTION.name().equals(cmd.getRole())) {
             serverActionExecutor.awake();
           } else {
-            actionQueue.enqueue(cmd.getHostname(), cmd);
+            commandsToEnqueue.put(cmd.getHostname(), cmd);
           }
         }
+        actionQueue.enqueueAll(commandsToEnqueue.asMap());
         LOG.debug("==> Finished.");
 
         if (!configuration.getParallelStageExecution()) { // If disabled
@@ -698,7 +703,8 @@ class ActionScheduler implements Runnable {
    * @return the stats for the roles in the stage which are used to determine
    * whether stage has succeeded or failed
    */
-  protected Map<String, RoleStats> processInProgressStage(Stage s, 
List<ExecutionCommand> commandsToSchedule) throws AmbariException {
+  protected Map<String, RoleStats> processInProgressStage(Stage s, 
List<ExecutionCommand> commandsToSchedule,
+                                                          Multimap<String, 
AgentCommand> commandsToEnqueue) throws AmbariException {
     LOG.debug("==> Collecting commands to schedule...");
     // Map to track role status
     Map<String, RoleStats> roleStats = initRoleStats(s);
@@ -830,7 +836,7 @@ class ActionScheduler implements Runnable {
             LOG.info("Removing command from queue, host={}, commandId={} ", 
host, c.getCommandId());
             actionQueue.dequeue(host, c.getCommandId());
           } else {
-            
cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, 
roleStr)));
+            
cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, 
roleStr)), commandsToEnqueue);
 
             LOG.info("Host: {}, role: {}, actionId: {} timed out and will be 
rescheduled", host,
               roleStr, s.getActionId());
@@ -1238,7 +1244,7 @@ class ActionScheduler implements Runnable {
     }
   }
 
-  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
+  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands, 
Multimap<String, AgentCommand> commandsToEnqueue) {
     for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
       // There are no server actions in actionQueue
       if (!Role.AMBARI_SERVER_ACTION.equals(hostRoleCommand.getRole())) {
@@ -1247,7 +1253,7 @@ class ActionScheduler implements Runnable {
           CancelCommand cancelCommand = new CancelCommand();
           cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
           cancelCommand.setReason("Stage timeout");
-          actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
+          commandsToEnqueue.put(hostRoleCommand.getHostName(), cancelCommand);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
index 6c7803e..d6d5a35 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
@@ -18,10 +18,12 @@
 package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -59,6 +61,24 @@ public class ActionQueue {
    * @throws NullPointerException - if hostname is {@code}null{@code}
    */
   public void enqueue(String hostname, AgentCommand cmd) {
+    Queue<AgentCommand> q = getHostQueue(hostname);
+
+    q.add(cmd);
+  }
+
+  /**
+   * Adds commands to queue (atomically) for given hostname
+   * @param hostname - hostname of node
+   * @param commands - list of commands to add to queue
+   * @throws NullPointerException - if hostname is {@code}null{@code}
+   */
+  public void enqueue(String hostname, Collection<AgentCommand> commands) {
+    Queue<AgentCommand> q = getHostQueue(hostname);
+
+    q.addAll(commands);
+  }
+
+  private Queue<AgentCommand> getHostQueue(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);
 
     if (q == null) {
@@ -70,8 +90,17 @@ public class ActionQueue {
       }
       //otherwise we got existing queue (and put nothing!)
     }
+    return q;
+  }
 
-    q.add(cmd);
+  /**
+   * Adds command map to queue
+   * @param commandMap - map with hostname as key and command list as value
+   */
+  public void enqueueAll(Map<String, Collection<AgentCommand>> commandMap) {
+    for (Map.Entry<String, Collection<AgentCommand>> entry : 
commandMap.entrySet()) {
+      enqueue(entry.getKey(), entry.getValue());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/63f17551/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
index f86c02e..a613a3b 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
@@ -106,6 +106,8 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.reflect.TypeToken;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
@@ -935,6 +937,7 @@ public class TestActionScheduler {
 
     aq.enqueue(Stage.INTERNAL_HOSTNAME, 
s.getExecutionCommands(null).get(0).getExecutionCommand());
     List<ExecutionCommand> commandsToSchedule = new 
ArrayList<ExecutionCommand>();
+    Multimap<String, AgentCommand> commandsToEnqueue = 
ArrayListMultimap.create();
 
     boolean taskShouldBeSkipped = stageSupportsAutoSkip && autoSkipFailedTask;
     db.timeoutHostRole(EasyMock.anyString(), EasyMock.anyLong(), 
EasyMock.anyLong(),
@@ -953,7 +956,7 @@ public class TestActionScheduler {
 
     EasyMock.replay(scheduler, fsm, host, db, cluster, ambariEventPublisher, 
service, serviceComponent, serviceComponentHost);
 
-    scheduler.processInProgressStage(s, commandsToSchedule);
+    scheduler.processInProgressStage(s, commandsToSchedule, commandsToEnqueue);
 
     EasyMock.verify(scheduler, fsm, host, db, cluster, ambariEventPublisher, 
service, serviceComponent, serviceComponentHost);
 

Reply via email to