Repository: incubator-usergrid Updated Branches: refs/heads/two-dot-o-dev e45728e40 -> e70f0472e
observable conversion Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8aa793b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8aa793b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8aa793b4 Branch: refs/heads/two-dot-o-dev Commit: 8aa793b4e29666f6474b0198c804337f2a57bd00 Parents: ceadc6c Author: Shawn Feldman <sfeld...@apache.org> Authored: Thu May 28 13:35:35 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Thu May 28 13:35:35 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 4 +- .../persistence/queue/DefaultQueueManager.java | 6 +- .../persistence/queue/QueueManager.java | 4 +- .../queue/impl/SNSQueueManagerImpl.java | 6 +- .../queue/impl/SQSQueueManagerImpl.java | 6 +- .../persistence/queue/QueueManagerTest.java | 8 +-- .../services/notifications/QueueListener.java | 2 +- .../services/queues/ImportQueueManager.java | 5 +- .../usergrid/services/queues/QueueListener.java | 75 ++++++++++---------- 9 files changed, 62 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index f3602f3..fc13d85 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -144,7 +144,7 @@ public class AmazonAsyncEventService implements AsyncEventService { /** * Take message from SQS */ - public List<QueueMessage> take() { + private Observable<QueueMessage> take() { //SQS doesn't support more than 10 final Timer.Context timer = this.readTimer.time(); @@ -376,7 +376,7 @@ public class AmazonAsyncEventService implements AsyncEventService { Timer.Context timer = readTimer.time(); try { - drainList = take(); + drainList = take().toList().toBlocking().last(); //emit our list in it's entity to hand off to a worker pool subscriber.onNext(drainList); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index 6917803..dc5878c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.queue; +import rx.Observable; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -32,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue; public class DefaultQueueManager implements QueueManager { public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); @Override - public synchronized List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); for(int i=0;i<limit;i++){ if(!queue.isEmpty()){ @@ -41,7 +43,7 @@ public class DefaultQueueManager implements QueueManager { break; } } - return returnQueue; + return Observable.from( returnQueue); } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java index dd044d2..09ae95a 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java @@ -17,6 +17,8 @@ */ package org.apache.usergrid.persistence.queue; +import rx.Observable; + import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -34,7 +36,7 @@ public interface QueueManager { * @param klass class to cast the return from * @return List of Queue Messages */ - List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass); + Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass); /** * Commit the transaction http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 0f1661d..802c2ce 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -212,14 +212,14 @@ public class SNSQueueManagerImpl implements QueueManager { } @Override - public List<QueueMessage> getMessages(final int limit, + public rx.Observable<QueueMessage> getMessages(final int limit, final int transactionTimeout, final int waitTime, final Class klass) { if (sqs == null) { logger.error("SQS is null - was not initialized properly"); - return new ArrayList<>(); + return rx.Observable.empty(); } @@ -251,7 +251,7 @@ public class SNSQueueManagerImpl implements QueueManager { QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type")); queueMessages.add(queueMessage); } - return queueMessages; + return rx.Observable.from( queueMessages); } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index e28e805..e6f16c8 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -173,14 +173,14 @@ public class SQSQueueManagerImpl implements QueueManager { } @Override - public List<QueueMessage> getMessages(final int limit, + public rx.Observable<QueueMessage> getMessages(final int limit, final int transactionTimeout, final int waitTime, final Class klass) { if (sqs == null) { logger.error("Sqs is null"); - return new ArrayList<>(); + return rx.Observable.empty(); } String url = getQueue().getUrl(); @@ -212,7 +212,7 @@ public class SQSQueueManagerImpl implements QueueManager { queueMessages.add(queueMessage); } - return queueMessages; + return rx.Observable.from(queueMessages); } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java index 452d328..33fa1f5 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java @@ -74,13 +74,13 @@ public class QueueManagerTest { public void send() throws IOException,ClassNotFoundException{ String value = "bodytest"; qm.sendMessage(value); - List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class); + List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last(); assertTrue(messageList.size() >= 1); for(QueueMessage message : messageList){ assertTrue(message.getBody().equals(value)); qm.commitMessage(message); } - messageList = qm.getMessages(1,5000,5000,String.class); + messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last(); assertTrue(messageList.size() <= 0); } @@ -93,14 +93,14 @@ public class QueueManagerTest { List<Map<String,String>> bodies = new ArrayList<>(); bodies.add(values); qm.sendMessages(bodies); - List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()); + List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last(); assertTrue(messageList.size() >= 1); for(QueueMessage message : messageList){ assertTrue(message.getBody().equals(values)); } qm.commitMessages(messageList); - messageList = qm.getMessages(1,5000,5000,values.getClass()); + messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last(); assertTrue(messageList.size() <= 0); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index e9f64b4..91b3e00 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -156,7 +156,7 @@ public class QueueListener { try { Timer.Context timerContext = timer.time(); - List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class); + List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class).toList().toBlocking().last(); LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName); if (messages.size() > 0) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index f23262e..5f42484 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueMessage; +import rx.Observable; /** @@ -34,9 +35,9 @@ import org.apache.usergrid.persistence.queue.QueueMessage; public class ImportQueueManager implements QueueManager { @Override - public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime, + public Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime, final Class klass ) { - return null; + return Observable.empty(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java index b87904e..2d8dd7a 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java @@ -171,52 +171,55 @@ public abstract class QueueListener { while ( true ) { - try { + Timer.Context timerContext = timer.time(); //Get the messages out of the queue. //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here. - List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class); - LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName); + queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class) + .buffer(getBatchSize()) + .doOnNext(messages -> { + try { + LOG.info("retrieved batch of {} messages from queue {} ", messages.size(), queueName); - if (messages.size() > 0) { + if (messages.size() > 0) { - long now = System.currentTimeMillis(); - //TODO: make sure this has a way to determine which QueueListener needs to be used - // ideally this is done by checking which type the messages have then - // asking for a onMessage call. - onMessage( messages ); + long now = System.currentTimeMillis(); + //TODO: make sure this has a way to determine which QueueListener needs to be used + // ideally this is done by checking which type the messages have then + // asking for a onMessage call. + onMessage(messages); - queueManager.commitMessages(messages); + queueManager.commitMessages(messages); - meter.mark(messages.size()); - LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now); + meter.mark(messages.size()); + LOG.info("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now); - if(sleepBetweenRuns > 0) { - LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns); - Thread.sleep(sleepBetweenRuns); - } + if (sleepBetweenRuns > 0) { + LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns); + Thread.sleep(sleepBetweenRuns); + } - } - else{ - LOG.info("no messages...sleep...{}", sleepWhenNoneFound); - Thread.sleep(sleepWhenNoneFound); - } - timerContext.stop(); - //send to the providers - consecutiveExceptions.set(0); - }catch (Exception ex){ - LOG.error("failed to dequeue",ex); - try { - long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet(); - long maxSleep = 15000; - sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ; - LOG.info("sleeping due to failures {} ms", sleeptime); - Thread.sleep(sleeptime); - }catch (InterruptedException ie){ - LOG.info("sleep interrupted"); - } - } + } else { + LOG.info("no messages...sleep...{}", sleepWhenNoneFound); + Thread.sleep(sleepWhenNoneFound); + } + timerContext.stop(); + //send to the providers + consecutiveExceptions.set(0); + } catch (Exception ex) { + LOG.error("failed to dequeue", ex); + try { + long sleeptime = sleepWhenNoneFound * consecutiveExceptions.incrementAndGet(); + long maxSleep = 15000; + sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime; + LOG.info("sleeping due to failures {} ms", sleeptime); + Thread.sleep(sleeptime); + } catch (InterruptedException ie) { + LOG.info("sleep interrupted"); + } + } + }).toBlocking().last(); } }