change queue impl
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4fa6749f Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4fa6749f Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4fa6749f Branch: refs/heads/master Commit: 4fa6749f027ece9388725dcdeb875f4bddbf7f5f Parents: 0cc225f Author: Shawn Feldman <[email protected]> Authored: Fri Oct 16 15:07:36 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Fri Oct 16 15:07:36 2015 -0600 ---------------------------------------------------------------------- .../persistence/queue/DefaultQueueManager.java | 34 ++++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/4fa6749f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index 0ef2849..ae0b0aa 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -30,17 +30,29 @@ import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; /** * Default queue manager implementation, uses in memory linked queue */ public class DefaultQueueManager implements QueueManager { - public LinkedBlockingQueue<QueueMessage> queue = new LinkedBlockingQueue<>(); + public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(1000); @Override - public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); - queue.drainTo(returnQueue,1000); + try { + QueueMessage message=null; + int count = 10; + do { + message = queue.poll(100, TimeUnit.MILLISECONDS); + if (message != null) { + returnQueue.add(message); + } + }while(message!=null && count-->0); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } return Observable.from( returnQueue); } @@ -58,17 +70,25 @@ public class DefaultQueueManager implements QueueManager { } @Override - public synchronized void sendMessages(List bodies) throws IOException { + public void sendMessages(List bodies) throws IOException { for(Object body : bodies){ String uuid = UUID.randomUUID().toString(); - queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"putappriate type here")); + try { + queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } } } @Override - public synchronized void sendMessage(Object body) throws IOException { + public void sendMessage(Object body) throws IOException { String uuid = UUID.randomUUID().toString(); - queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here")); + try { + queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } } @Override
