Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-543 [created] ee6e087f8
add queue depth Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bb6ca8ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bb6ca8ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bb6ca8ed Branch: refs/heads/USERGRID-543 Commit: bb6ca8edfed851cd03f23b96bb91bec0ee3b990a Parents: 35430a5 Author: Shawn Feldman <sfeld...@apache.org> Authored: Fri Aug 14 14:45:58 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Fri Aug 14 14:45:58 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 2 +- .../asyncevents/AsyncEventService.java | 2 ++ .../persistence/queue/DefaultQueueManager.java | 5 ++++ .../persistence/queue/QueueManager.java | 6 ++++ .../queue/impl/SNSQueueManagerImpl.java | 14 +++++++++ .../queue/impl/SQSQueueManagerImpl.java | 31 +++++++++----------- .../persistence/queue/QueueManagerTest.java | 18 ++++++++++++ .../org/apache/usergrid/rest/RootResource.java | 17 ++++++++++- .../services/queues/ImportQueueManager.java | 5 ++++ 9 files changed, 81 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index b71a549..ed106e2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -80,7 +80,7 @@ public class AmazonAsyncEventService implements AsyncEventService { // SQS maximum receive messages is 10 private static final int MAX_TAKE = 10; - private static final String QUEUE_NAME = "es_queue"; + public static final String QUEUE_NAME = "es_queue"; private final QueueManager queue; private final QueueScope queueScope; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 1a5e865..7cce8b3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -76,4 +76,6 @@ public interface AsyncEventService extends ReIndexAction { + + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java index dc5878c..c72e109 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java @@ -47,6 +47,11 @@ public class DefaultQueueManager implements QueueManager { } @Override + public long getQueueDepth() { + return queue.size(); + } + + @Override public void commitMessage(QueueMessage queueMessage) { } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java index 09ae95a..0ec2337 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java @@ -39,6 +39,12 @@ public interface QueueManager { Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass); /** + * get the queue depth + * @return + */ + long getQueueDepth(); + + /** * Commit the transaction * @param queueMessage */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index f41d238..6d0e18b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -381,6 +381,20 @@ public class SNSQueueManagerImpl implements QueueManager { } @Override + public long getQueueDepth() { + String key = "ApproximateNumberOfMessages"; + try { + GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key)); + String depthString = result.getAttributes().get(key); + return depthString != null ? Long.parseLong(depthString) : 0; + }catch (Exception e){ + logger.error("Exception getting queue depth",e); + return -1; + + } + } + + @Override public void sendMessages(final List bodies) throws IOException { if (snsAsync == null) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index effa373..075e90c 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; +import com.amazonaws.services.sqs.model.*; import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; import org.slf4j.Logger; @@ -36,22 +37,6 @@ import org.apache.usergrid.persistence.queue.QueueScope; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.BatchResultErrorEntry; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.CreateQueueResult; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.DeleteMessageBatchResult; -import com.amazonaws.services.sqs.model.DeleteMessageRequest; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.QueueDoesNotExistException; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.SendMessageRequest; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Preconditions; @@ -147,7 +132,6 @@ public class SQSQueueManagerImpl implements QueueManager { //pretty print, disabling for speed // mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); - sqs = createClient(); } catch (Exception e) { @@ -219,6 +203,19 @@ public class SQSQueueManagerImpl implements QueueManager { } @Override + public long getQueueDepth() { + String key = "ApproximateNumberOfMessages"; + try { + GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key)); + String depthString = result.getAttributes().get(key); + return depthString != null ? Long.parseLong(depthString) : 0; + }catch (Exception e){ + logger.error("Exception getting queue depth",e); + return -1; + + } + } + @Override public void sendMessages(final List bodies) throws IOException { if (sqs == null) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java index 3be02e1..e948015 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java @@ -105,5 +105,23 @@ public class QueueManagerTest { } + @Test + public void queueSize() throws IOException,ClassNotFoundException{ + HashMap<String,String> values = new HashMap<>(); + values.put("test", "Test"); + + List<Map<String,String>> bodies = new ArrayList<>(); + bodies.add(values); + qm.sendMessages(bodies); + long depth = qm.getQueueDepth(); + assertTrue(depth>0); + List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last(); + assertTrue(messageList.size() >= 1); + for(QueueMessage message : messageList){ + assertTrue(message.getBody().equals(values)); + } + qm.commitMessages(messageList); + } + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java index 5b5e711..989df26 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java @@ -37,8 +37,15 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.UriInfo; +import com.google.inject.Injector; +import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexRefreshCommand; import org.apache.usergrid.persistence.index.query.Identifier; +import org.apache.usergrid.persistence.queue.Queue; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -102,6 +109,9 @@ public class RootResource extends AbstractContextResource implements MetricProce @Autowired private UsergridSystemMonitor usergridSystemMonitor; + @Autowired + private Injector injector; + public RootResource() { } @@ -181,6 +191,10 @@ public class RootResource extends AbstractContextResource implements MetricProce ApiResponse response = createApiResponse(); + QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class); + QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS); + QueueManager queue = queueManagerFactory.getQueueManager(queueScope); + if ( !ignoreError ) { if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) { @@ -205,8 +219,9 @@ public class RootResource extends AbstractContextResource implements MetricProce node.put( "cassandraStatus", emf.getEntityStoreHealth().toString() ); // Core Persistence Query Index module status for Management App Index - EntityManager em = emf.getEntityManager(emf.getManagementAppId()); node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() ); + node.put( "queueDepth", queue.getQueueDepth() ); + dumpMetrics(node); response.setProperty( "status", node ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index 5f42484..d74f688 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -40,6 +40,11 @@ public class ImportQueueManager implements QueueManager { return Observable.empty(); } + @Override + public long getQueueDepth() { + return 0; + } + @Override public void commitMessage( final QueueMessage queueMessage ) {