Repository: usergrid Updated Branches: refs/heads/master 853625399 -> 159c8c325
Add connection and socket timeouts for the AWS SNS and SQS clients. Separate queue status into its own endpoint. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/91739c60 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/91739c60 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/91739c60 Branch: refs/heads/master Commit: 91739c60ffa8d7b2db25fa9e7eb731cbfb81863f Parents: 97fae94 Author: Michael Russo <[email protected]> Authored: Sat Feb 27 21:41:50 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Sat Feb 27 21:41:50 2016 -0800 ---------------------------------------------------------------------- .../asyncevents/AsyncEventService.java | 7 +++++ .../asyncevents/AsyncEventServiceImpl.java | 6 ++++ .../usergrid/persistence/queue/QueueFig.java | 10 +++++++ .../queue/impl/SNSQueueManagerImpl.java | 27 ++++++++++++----- .../org/apache/usergrid/rest/RootResource.java | 31 +++++++++++++++++--- 5 files changed, 69 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 288fb12..1abf83f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -88,6 +88,13 @@ public interface AsyncEventService extends ReIndexAction { */ long getQueueDepth(); + /** + * name of current queue manager implemented + * @return + */ + String getQueueManagerClass(); + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 7a71410..b1d0805 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -825,5 +825,11 @@ public class AsyncEventServiceImpl implements AsyncEventService { } } + public String getQueueManagerClass() { + + return queue.getClass().getSimpleName(); + + } + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 88ad3ff..7757d58 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -86,4 +86,14 @@ public interface QueueFig extends GuicyFig { @Key( "usergrid.queue.localquorum.timeout") @Default("3000") // 3 seconds int getLocalQuorumTimeout(); + + @Key( "usergrid.queue.client.connection.timeout") + @Default( "1000" ) // 3 seconds + int getQueueClientConnectionTimeout(); + + @Key( "usergrid.queue.client.socket.timeout") + @Default( "3000" ) // 3 seconds + int getQueueClientSocketTimeout(); + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 4028d46..f1d8c5a 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import com.amazonaws.ClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,7 @@ public class SNSQueueManagerImpl implements QueueManager { private final QueueFig fig; private final ClusterFig clusterFig; private final CassandraFig cassandraFig; + private final ClientConfiguration clientConfiguration; private final AmazonSQSClient sqs; private final AmazonSNSClient sns; private final AmazonSNSAsyncClient snsAsync; @@ -165,6 +167,11 @@ public class SNSQueueManagerImpl implements QueueManager { final Region region = getRegion(); + this.clientConfiguration = new ClientConfiguration() + .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout()) + .withSocketTimeout(queueFig.getQueueClientSocketTimeout()) + .withGzip(true); + try { sqs = createSQSClient( region ); sns = createSNSClient( region ); @@ -329,10 +336,10 @@ public class SNSQueueManagerImpl implements QueueManager { */ private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) { - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - - final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor ); + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + final AmazonSNSAsyncClient sns = + new AmazonSNSAsyncClient( ugProvider.getCredentials(), clientConfiguration, executor ); sns.setRegion( region ); @@ -344,9 +351,10 @@ public class SNSQueueManagerImpl implements QueueManager { * Create the async sqs client */ private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) { - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor ); + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + final AmazonSQSAsyncClient sqs = + new AmazonSQSAsyncClient( ugProvider.getCredentials(),clientConfiguration, executor ); sqs.setRegion( region ); @@ -358,9 +366,10 @@ public class SNSQueueManagerImpl implements QueueManager { * The Synchronous SNS client is used for creating topics and subscribing queues. */ private AmazonSNSClient createSNSClient( final Region region ) { - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() ); + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); + final AmazonSNSClient sns = + new AmazonSNSClient( ugProvider.getCredentials(), clientConfiguration ); sns.setRegion( region ); @@ -663,8 +672,10 @@ public class SNSQueueManagerImpl implements QueueManager { * Create the SQS client for the specified settings */ private AmazonSQSClient createSQSClient( final Region region ) { + final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() ); + final AmazonSQSClient sqs = + new AmazonSQSClient( ugProvider.getCredentials(), clientConfiguration ); sqs.setRegion( region ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/91739c60/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java index b7118a3..75ed567 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java @@ -174,9 +174,6 @@ public class RootResource extends AbstractContextResource implements MetricProce ApiResponse response = createApiResponse(); - AsyncEventService eventService = injector.getInstance(AsyncEventService.class); - - if ( !ignoreError ) { if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) { @@ -202,7 +199,6 @@ public class RootResource extends AbstractContextResource implements MetricProce // Core Persistence Query Index module status for Management App Index node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() ); - node.put( "queueDepth", eventService.getQueueDepth() ); dumpMetrics(node); @@ -226,6 +222,33 @@ public class RootResource extends AbstractContextResource implements MetricProce return response.build(); } + @GET + @Path("/status/queue") + @JSONP + @Produces({MediaType.APPLICATION_JSON, "application/javascript"}) + public ApiResponse getQueueDepth(){ + + ApiResponse response = createApiResponse(); + AsyncEventService eventService = injector.getInstance(AsyncEventService.class); + + ObjectNode node = JsonNodeFactory.instance.objectNode(); + + String provider = "LOCAL"; + String queueManagerClass = eventService.getQueueManagerClass(); + + if(queueManagerClass.contains("SNS") || queueManagerClass.contains("SQS")){ + provider = "AWS"; + } + + node.put( "provider", provider ); + node.put( "depth", eventService.getQueueDepth() ); + + response.setProperty( "status", node ); + return response; + + } + + private void dumpMetrics( ObjectNode node ) { MetricsRegistry registry = Metrics.defaultRegistry();
