add queue depth
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ee6e087f Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ee6e087f Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ee6e087f Branch: refs/heads/USERGRID-543 Commit: ee6e087f8b8acde9a27952a2d4a46ae80773559c Parents: bb6ca8e Author: Shawn Feldman <sfeld...@apache.org> Authored: Fri Aug 14 14:48:32 2015 -0600 Committer: Shawn Feldman <sfeld...@apache.org> Committed: Fri Aug 14 14:48:32 2015 -0600 ---------------------------------------------------------------------- .../asyncevents/AmazonAsyncEventService.java | 5 +++++ .../corepersistence/asyncevents/AsyncEventService.java | 5 +++++ .../asyncevents/InMemoryAsyncEventService.java | 12 +++++++++--- .../java/org/apache/usergrid/rest/RootResource.java | 10 +++++----- 4 files changed, 24 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index ed106e2..46c7076 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -373,6 +373,11 @@ public class AmazonAsyncEventService implements AsyncEventService { offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); } + @Override + public long getQueueDepth() { + return queue.getQueueDepth(); + } + public void handleEntityDelete(final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 7cce8b3..6d51679 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -75,6 +75,11 @@ public interface AsyncEventService extends ReIndexAction { void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId); + /** + * current queue depth + * @return + */ + long getQueueDepth(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java index 67078dc..6a71b3e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java @@ -79,13 +79,13 @@ public class InMemoryAsyncEventService implements AsyncEventService { @Override public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) { - run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) ); + run( eventBuilder.buildNewEdge(applicationScope, entity, newEdge) ); } @Override public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) { - run( eventBuilder.buildDeleteEdge( applicationScope, edge ) ); + run( eventBuilder.buildDeleteEdge(applicationScope, edge) ); } @@ -103,7 +103,7 @@ public class InMemoryAsyncEventService implements AsyncEventService { public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) { final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince ); - run(eventBuilder.buildEntityIndex( entityIndexOperation )); + run(eventBuilder.buildEntityIndex(entityIndexOperation)); } public void indexBatch(final List<EdgeScope> edges, final long updatedSince) { @@ -125,4 +125,10 @@ public class InMemoryAsyncEventService implements AsyncEventService { observable.toBlocking().lastOrDefault(null); } } + + @Override + public long getQueueDepth() { + return 0; + } + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java index 989df26..7c17b7c 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java @@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.UriInfo; import com.google.inject.Injector; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.persistence.index.IndexFig; import org.apache.usergrid.persistence.index.IndexRefreshCommand; import org.apache.usergrid.persistence.index.query.Identifier; @@ -191,9 +192,8 @@ public class RootResource extends AbstractContextResource implements MetricProce ApiResponse response = createApiResponse(); - QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class); - QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS); - QueueManager queue = queueManagerFactory.getQueueManager(queueScope); + AsyncEventService eventService = injector.getInstance(AsyncEventService.class); + if ( !ignoreError ) { @@ -210,7 +210,7 @@ public class RootResource extends AbstractContextResource implements MetricProce ObjectNode node = JsonNodeFactory.instance.objectNode(); node.put( "started", started ); node.put( "uptime", System.currentTimeMillis() - started ); - node.put( "version", usergridSystemMonitor.getBuildNumber() ); + node.put( "version", usergridSystemMonitor.getBuildNumber()); // Hector status, for backwards compatibility node.put("cassandraAvailable", usergridSystemMonitor.getIsCassandraAlive()); @@ -220,7 +220,7 @@ public class RootResource extends AbstractContextResource implements MetricProce // Core Persistence Query Index module status for Management App Index node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() ); - node.put( "queueDepth", queue.getQueueDepth() ); + node.put( "queueDepth", eventService.getQueueDepth() ); dumpMetrics(node);