Repository: ambari Updated Branches: refs/heads/trunk 787e7eb4c -> b85af30d7
AMBARI-5761. 2000-node cluster testing: during install phase of cluster deployment, install tasks were stuck in PENDING state. (mpapirkovskyy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b85af30d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b85af30d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b85af30d Branch: refs/heads/trunk Commit: b85af30d7e18422ce6e23d8a2260707b41122159 Parents: 787e7eb Author: Myroslav Papirkovskyy <[email protected]> Authored: Thu May 15 20:22:13 2014 +0300 Committer: Myroslav Papirkovskyy <[email protected]> Committed: Thu May 15 20:25:40 2014 +0300 ---------------------------------------------------------------------- .../apache/ambari/server/agent/ActionQueue.java | 108 ++++++++++--------- 1 file changed, 56 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b85af30d/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java index e99062a..225c7df 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,73 +39,76 @@ public class ActionQueue { private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class); - Map<String, Queue<AgentCommand>> hostQueues; + final ConcurrentMap<String, Queue<AgentCommand>> hostQueues; public ActionQueue() { - hostQueues = new HashMap<String, Queue<AgentCommand>>(); + hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>(); } - private synchronized Queue<AgentCommand> getQueue(String hostname) { + private Queue<AgentCommand> getQueue(String hostname) { return hostQueues.get(hostname); } - private synchronized void addQueue(String hostname, Queue<AgentCommand> q) { - hostQueues.put(hostname, q); - } - + /** + * Adds command to queue for given hostname + * @param hostname - hostname of node + * @param cmd - command to add to queue + */ public void enqueue(String hostname, AgentCommand cmd) { - Queue<AgentCommand> q; - synchronized (this) { - q = getQueue(hostname); + Queue<AgentCommand> q = getQueue(hostname); + + if (q == null) { + //try to add new queue to map if not found + q = hostQueues.putIfAbsent(hostname, new ConcurrentLinkedQueue<AgentCommand>()); if (q == null) { - addQueue(hostname, new LinkedList<AgentCommand>()); + //null means that new queue was added to map, get it q = getQueue(hostname); } + //otherwise we got existing queue (and put nothing!) } - synchronized (q) { - if (q.contains(cmd)) { - LOG.warn("cmd already exists in the queue, not adding again"); - return; - } - q.add(cmd); - } + + q.add(cmd); } + /** + * Get command from queue for given hostname + * @param hostname + * @return + */ public AgentCommand dequeue(String hostname) { Queue<AgentCommand> q = getQueue(hostname); if (q == null) { return null; } - synchronized (q) { - if (q.isEmpty()) { - return null; - } else { - return q.remove(); - } - } + + return q.poll(); } + /** + * Try to dequeue command with provided id. + * @param hostname + * @param commandId + * @return + */ public AgentCommand dequeue(String hostname, String commandId) { Queue<AgentCommand> q = getQueue(hostname); if (q == null) { return null; } - synchronized (q) { - if (q.isEmpty()) { - return null; - } else { - AgentCommand c = null; - for (Iterator it = q.iterator(); it.hasNext();) { - AgentCommand ac = (AgentCommand) it.next(); - if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac) - .getCommandId().equals(commandId)) { - c = ac; - it.remove(); - break; - } + if (q.isEmpty()) { + return null; + } else { + AgentCommand c = null; + for (Iterator it = q.iterator(); it.hasNext(); ) { + AgentCommand ac = (AgentCommand) it.next(); + if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac) + .getCommandId().equals(commandId)) { + c = ac; + it.remove(); + break; } - return c; } + return c; } } @@ -111,9 +117,7 @@ public class ActionQueue { if (q == null) { return 0; } - synchronized(q) { return q.size(); - } } public List<AgentCommand> dequeueAll(String hostname) { @@ -122,17 +126,17 @@ public class ActionQueue { return null; } List<AgentCommand> l = new ArrayList<AgentCommand>(); - synchronized (q) { - while (true) { - try { - AgentCommand cmd = q.remove(); - if (cmd != null) { - l.add(cmd); - } - } catch (NoSuchElementException ex) { - return l; - } + + AgentCommand command; + do { + //get commands from queue until empty + command = q.poll(); + if (command != null) { + l.add(command); } - } + } while (command != null); + + return l; + } }
