Repository: usergrid Updated Branches: refs/heads/jackson-exclusion 5222b9a04 -> 9fd657b84
fix queue manager impl Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9fd657b8 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9fd657b8 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9fd657b8 Branch: refs/heads/jackson-exclusion Commit: 9fd657b84ee86af50ee207e2b5d60054964c30e0 Parents: 5222b9a Author: Shawn Feldman <[email protected]> Authored: Mon Nov 9 14:14:28 2015 -0700 Committer: Shawn Feldman <[email protected]> Committed: Mon Nov 9 14:14:28 2015 -0700 ---------------------------------------------------------------------- .../persistence/queue/DefaultQueueManager.java | 61 +++++++++++++------- 1 file changed, 40 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fd657b8/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index 5201279..c3b5917 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -1,3 +1,4 @@ + /* * * * Licensed to the Apache Software Foundation (ASF) under one or more @@ -18,33 +19,43 @@ * */ -package org.apache.usergrid.persistence.queue; + package org.apache.usergrid.persistence.queue; -import rx.Observable; + import rx.Observable; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; + import java.io.IOException; + import java.util.AbstractQueue; + import java.io.Serializable; + import java.util.ArrayList; + import java.util.List; + import java.util.UUID; + import java.util.concurrent.ArrayBlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.PriorityBlockingQueue; + import java.util.concurrent.TimeUnit; /** * Default queue manager implementation, uses in memory linked queue */ public class DefaultQueueManager implements QueueManager { public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); + @Override - public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + public Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); - for(int i=0;i<limit;i++){ - if(!queue.isEmpty()){ - returnQueue.add( queue.remove()); - }else{ - break; - } + try { + QueueMessage message=null; + int count = 5; + do { + message = queue.poll(100, TimeUnit.MILLISECONDS); + if (message != null) { + returnQueue.add(message); + } + }while(message!=null && count-->0); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); } - return Observable.from( returnQueue); + return rx.Observable.from(returnQueue); } @Override @@ -61,10 +72,14 @@ public class DefaultQueueManager implements QueueManager { } @Override - public synchronized void sendMessages(List bodies) throws IOException { + public void sendMessages(List bodies) throws IOException { for(Object body : bodies){ String uuid = UUID.randomUUID().toString(); - queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"putappriate type here")); + try { + queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } } } @@ -72,14 +87,18 @@ public class DefaultQueueManager implements QueueManager { @Override public <T extends Serializable> void sendMessage( final T body ) throws IOException { String uuid = UUID.randomUUID().toString(); - queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here")); - + try { + queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS); + }catch (InterruptedException ie){ + throw new RuntimeException(ie); + } } + @Override public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException { - sendMessage( body ); + sendMessage( body ); }
