Repository: ambari Updated Branches: refs/heads/branch-1.6.0 7e39d88d4 -> 5c27e9eb5
Revert "AMBARI-5761. 2000-node cluster testing: during install phase of cluster deployment, install tasks were stuck in PENDING state. (mpapirkovskyy)" This reverts commit 7e39d88d43eac5fdec15eed239464dd1c0524ed9. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5c27e9eb Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5c27e9eb Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5c27e9eb Branch: refs/heads/branch-1.6.0 Commit: 5c27e9eb548a2bbe74e205ee22ba2d6ea9e5f4dc Parents: 7e39d88 Author: Myroslav Papirkovskyy <[email protected]> Authored: Thu May 15 20:28:09 2014 +0300 Committer: Myroslav Papirkovskyy <[email protected]> Committed: Thu May 15 20:28:09 2014 +0300 ---------------------------------------------------------------------- .../apache/ambari/server/agent/ActionQueue.java | 108 +++++++++---------- 1 file changed, 52 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5c27e9eb/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java index 225c7df..e99062a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java @@ -25,9 +25,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,76 +36,73 @@ public class ActionQueue { private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class); - final ConcurrentMap<String, Queue<AgentCommand>> hostQueues; + Map<String, Queue<AgentCommand>> hostQueues; public ActionQueue() { - hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>(); + hostQueues = new HashMap<String, Queue<AgentCommand>>(); } - private Queue<AgentCommand> getQueue(String hostname) { + private synchronized Queue<AgentCommand> getQueue(String hostname) { return hostQueues.get(hostname); } - /** - * Adds command to queue for given hostname - * @param hostname - hostname of node - * @param cmd - command to add to queue - */ - public void enqueue(String hostname, AgentCommand cmd) { - Queue<AgentCommand> q = getQueue(hostname); + private synchronized void addQueue(String hostname, Queue<AgentCommand> q) { + hostQueues.put(hostname, q); + } - if (q == null) { - //try to add new queue to map if not found - q = hostQueues.putIfAbsent(hostname, new ConcurrentLinkedQueue<AgentCommand>()); + public void enqueue(String hostname, AgentCommand cmd) { + Queue<AgentCommand> q; + synchronized (this) { + q = getQueue(hostname); if (q == null) { - //null means that new queue was added to map, get it + addQueue(hostname, new LinkedList<AgentCommand>()); q = getQueue(hostname); } - //otherwise we got existing queue (and put nothing!) } - - q.add(cmd); + synchronized (q) { + if (q.contains(cmd)) { + LOG.warn("cmd already exists in the queue, not adding again"); + return; + } + q.add(cmd); + } } - /** - * Get command from queue for given hostname - * @param hostname - * @return - */ public AgentCommand dequeue(String hostname) { Queue<AgentCommand> q = getQueue(hostname); if (q == null) { return null; } - - return q.poll(); + synchronized (q) { + if (q.isEmpty()) { + return null; + } else { + return q.remove(); + } + } } - /** - * Try to dequeue command with provided id. - * @param hostname - * @param commandId - * @return - */ public AgentCommand dequeue(String hostname, String commandId) { Queue<AgentCommand> q = getQueue(hostname); if (q == null) { return null; } - if (q.isEmpty()) { - return null; - } else { - AgentCommand c = null; - for (Iterator it = q.iterator(); it.hasNext(); ) { - AgentCommand ac = (AgentCommand) it.next(); - if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac) - .getCommandId().equals(commandId)) { - c = ac; - it.remove(); - break; + synchronized (q) { + if (q.isEmpty()) { + return null; + } else { + AgentCommand c = null; + for (Iterator it = q.iterator(); it.hasNext();) { + AgentCommand ac = (AgentCommand) it.next(); + if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac) + .getCommandId().equals(commandId)) { + c = ac; + it.remove(); + break; + } } + return c; } - return c; } } @@ -117,7 +111,9 @@ public class ActionQueue { if (q == null) { return 0; } + synchronized(q) { return q.size(); + } } public List<AgentCommand> dequeueAll(String hostname) { @@ -126,17 +122,17 @@ public class ActionQueue { return null; } List<AgentCommand> l = new ArrayList<AgentCommand>(); - - AgentCommand command; - do { - //get commands from queue until empty - command = q.poll(); - if (command != null) { - l.add(command); + synchronized (q) { + while (true) { + try { + AgentCommand cmd = q.remove(); + if (cmd != null) { + l.add(cmd); + } + } catch (NoSuchElementException ex) { + return l; + } } - } while (command != null); - - return l; - + } } }
