Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-900 d67c220dd -> 491008e96
Fixes runtime bug with joda time conflict with Astayanx Fixes bug in AmazonUtils incorrectly re-throwing missing exception. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/29d115fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/29d115fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/29d115fc Branch: refs/heads/USERGRID-900 Commit: 29d115fc88cc9a058735f3582bb73aee39ec16e7 Parents: d67c220 Author: Todd Nine <tn...@apigee.com> Authored: Tue Aug 4 15:13:04 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Tue Aug 4 15:13:04 2015 -0600 ---------------------------------------------------------------------- stack/corepersistence/queue/pom.xml | 20 +- .../queue/util/AmazonNotificationUtils.java | 230 ++++++++++--------- 2 files changed, 135 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/pom.xml ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml index 7780997..2d46dc8 100644 --- a/stack/corepersistence/queue/pom.xml +++ b/stack/corepersistence/queue/pom.xml @@ -54,13 +54,19 @@ <!-- tests --> - <dependency> - <groupId>org.apache.usergrid</groupId> - <artifactId>common</artifactId> - <version>${project.version}</version> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> + <dependency> + <groupId>org.apache.usergrid</groupId> + <artifactId>common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.8.1</version> + </dependency> <dependency> <groupId>org.apache.usergrid</groupId> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java index 1d86823..9561a58 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java @@ -1,141 +1,150 @@ package org.apache.usergrid.persistence.queue.util; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.auth.policy.*; -import com.amazonaws.auth.policy.actions.SQSActions; -import com.amazonaws.auth.policy.conditions.ConditionFactory; -import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.*; -import com.amazonaws.services.sns.util.Topics; -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.*; -import org.apache.usergrid.persistence.queue.Queue; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.queue.QueueFig; + +import com.amazonaws.auth.policy.Condition; +import com.amazonaws.auth.policy.Policy; +import com.amazonaws.auth.policy.Principal; +import com.amazonaws.auth.policy.Resource; +import com.amazonaws.auth.policy.Statement; +import com.amazonaws.auth.policy.actions.SQSActions; +import com.amazonaws.auth.policy.conditions.ConditionFactory; +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.CreateTopicResult; +import com.amazonaws.services.sns.model.ListTopicsResult; +import com.amazonaws.services.sns.model.Topic; +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.GetQueueAttributesRequest; +import com.amazonaws.services.sqs.model.GetQueueAttributesResult; +import com.amazonaws.services.sqs.model.GetQueueUrlResult; +import com.amazonaws.services.sqs.model.QueueDoesNotExistException; +import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; + + /** * Created by Jeff West on 5/25/15. */ public class AmazonNotificationUtils { - private static final Logger logger = LoggerFactory.getLogger(AmazonNotificationUtils.class); + private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class ); + - public static String createQueue(final AmazonSQSClient sqs, - final String queueName, - final QueueFig fig) - throws Exception { + public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig ) + throws Exception { - final String deadletterQueueName = String.format("%s_dead", queueName); - final Map<String, String> deadLetterAttributes = new HashMap<>(2); + final String deadletterQueueName = String.format( "%s_dead", queueName ); + final Map<String, String> deadLetterAttributes = new HashMap<>( 2 ); - deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod()); + deadLetterAttributes.put( "MessageRetentionPeriod", fig.getDeadletterRetentionPeriod() ); - CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest() - .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes); + CreateQueueRequest createDeadLetterQueueRequest = + new CreateQueueRequest().withQueueName( deadletterQueueName ).withAttributes( deadLetterAttributes ); - final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest); + final CreateQueueResult deadletterResult = sqs.createQueue( createDeadLetterQueueRequest ); - logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl()); + logger.info( "Created deadletter queue with url {}", deadletterResult.getQueueUrl() ); - final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName); + final String deadletterArn = AmazonNotificationUtils.getQueueArnByName( sqs, deadletterQueueName ); - String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," + - " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn); + String redrivePolicy = String + .format( "{\"maxReceiveCount\":\"%s\"," + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), + deadletterArn ); - final Map<String, String> queueAttributes = new HashMap<>(2); - deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod()); - deadLetterAttributes.put("RedrivePolicy", redrivePolicy); + final Map<String, String> queueAttributes = new HashMap<>( 2 ); + deadLetterAttributes.put( "MessageRetentionPeriod", fig.getRetentionPeriod() ); + deadLetterAttributes.put( "RedrivePolicy", redrivePolicy ); CreateQueueRequest createQueueRequest = new CreateQueueRequest(). - withQueueName(queueName) - .withAttributes(queueAttributes); + withQueueName( queueName ) + .withAttributes( queueAttributes ); - CreateQueueResult result = sqs.createQueue(createQueueRequest); + CreateQueueResult result = sqs.createQueue( createQueueRequest ); String url = result.getQueueUrl(); - logger.info("Created SQS queue with url {}", url); + logger.info( "Created SQS queue with url {}", url ); return url; } - public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs, - final String queueUrl, - final List<String> topicARNs) throws Exception{ - String queueARN = getQueueArnByUrl(sqs, queueUrl); + public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl, + final List<String> topicARNs ) throws Exception { - Statement statement = new Statement(Statement.Effect.Allow) - .withActions(SQSActions.SendMessage) - .withPrincipals(new Principal("*")) - .withResources(new Resource(queueARN)); + String queueARN = getQueueArnByUrl( sqs, queueUrl ); - List<Condition> conditions = new ArrayList<>(); + Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage ) + .withPrincipals( new Principal( "*" ) ) + .withResources( new Resource( queueARN ) ); - for(String topicARN : topicARNs){ + List<Condition> conditions = new ArrayList<>(); - conditions.add(ConditionFactory.newSourceArnCondition(topicARN)); + for ( String topicARN : topicARNs ) { + conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) ); } - statement.setConditions(conditions); + statement.setConditions( conditions ); - Policy policy = new Policy("SubscriptionPermission").withStatements(statement); + Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement ); final Map<String, String> queueAttributes = new HashMap<>(); - queueAttributes.put("Policy", policy.toJson()); + queueAttributes.put( "Policy", policy.toJson() ); - SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes); + SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes ); try { - sqs.setQueueAttributes(queueAttributesRequest); - }catch (Exception e){ - logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e); + sqs.setQueueAttributes( queueAttributesRequest ); + } + catch ( Exception e ) { + logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, + topicARNs.toString(), e ); } - - } - public static String getQueueArnByName(final AmazonSQSClient sqs, - final String queueName) - throws Exception { + public static String getQueueArnByName( final AmazonSQSClient sqs, final String queueName ) throws Exception { String queueUrl = null; try { - GetQueueUrlResult result = sqs.getQueueUrl(queueName); + GetQueueUrlResult result = sqs.getQueueUrl( queueName ); queueUrl = result.getQueueUrl(); - - } catch (QueueDoesNotExistException queueDoesNotExistException) { + } + catch ( QueueDoesNotExistException queueDoesNotExistException ) { //no op, swallow - logger.warn("Queue {} does not exist", queueName); + logger.warn( "Queue {} does not exist", queueName ); return null; - - } catch (Exception e) { - logger.error(String.format("Failed to get URL for Queue [%s] from SQS", queueName), e); + } + catch ( Exception e ) { + logger.error( String.format( "Failed to get URL for Queue [%s] from SQS", queueName ), e ); throw e; } - if (queueUrl != null) { + if ( queueUrl != null ) { try { - GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl) - .withAttributeNames("All"); + GetQueueAttributesRequest queueAttributesRequest = + new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" ); - GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest); + GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest ); Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes(); - return sqsAttributeMap.get("QueueArn"); - - } catch (Exception e) { - logger.error("Failed to get queue URL from service", e); + return sqsAttributeMap.get( "QueueArn" ); + } + catch ( Exception e ) { + logger.error( "Failed to get queue URL from service", e ); throw e; } } @@ -143,75 +152,80 @@ public class AmazonNotificationUtils { return null; } - public static String getQueueArnByUrl(final AmazonSQSClient sqs, - final String queueUrl) - throws Exception { + + public static String getQueueArnByUrl( final AmazonSQSClient sqs, final String queueUrl ) throws Exception { try { - GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl) - .withAttributeNames("All"); + GetQueueAttributesRequest queueAttributesRequest = + new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" ); - GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest); + GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest ); Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes(); - return sqsAttributeMap.get("QueueArn"); - - } catch (Exception e) { - logger.error("Failed to get queue URL from service", e); + return sqsAttributeMap.get( "QueueArn" ); + } + catch ( Exception e ) { + logger.error( "Failed to get queue URL from service", e ); throw e; } } - public static String getTopicArn(final AmazonSNSClient sns, - final String queueName, - final boolean createOnMissing) - throws Exception { - if (logger.isDebugEnabled()) - logger.debug("Looking up Topic ARN: {}", queueName); + public static String getTopicArn( final AmazonSNSClient sns, final String queueName, final boolean createOnMissing ) + throws Exception { + + if ( logger.isDebugEnabled() ) { + logger.debug( "Looking up Topic ARN: {}", queueName ); + } ListTopicsResult listTopicsResult = sns.listTopics(); String topicArn = null; - for (Topic topic : listTopicsResult.getTopics()) { + for ( Topic topic : listTopicsResult.getTopics() ) { String arn = topic.getTopicArn(); - if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) { + if ( queueName.equals( arn.substring( arn.lastIndexOf( ':' ) ) ) ) { topicArn = arn; - logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName); + logger.info( "Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName ); } } - if (topicArn == null && createOnMissing) { - logger.info("Creating topic for queue=[{}]...", queueName); + if ( topicArn == null && createOnMissing ) { + logger.info( "Creating topic for queue=[{}]...", queueName ); - CreateTopicResult createTopicResult = sns.createTopic(queueName); + CreateTopicResult createTopicResult = sns.createTopic( queueName ); topicArn = createTopicResult.getTopicArn(); - logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn); - } else { - logger.error("Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, createOnMissing); + logger.info( "Successfully created topic with name {} and arn {}", queueName, topicArn ); + } + else { + logger.error( "Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, + createOnMissing ); } - if (logger.isDebugEnabled()) - logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName); + if ( logger.isDebugEnabled() ) { + logger.debug( "Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName ); + } return topicArn; } - public static String getQueueUrlByName(final AmazonSQSClient sqs, - final String queueName) { + + public static String getQueueUrlByName( final AmazonSQSClient sqs, final String queueName ) { try { - GetQueueUrlResult result = sqs.getQueueUrl(queueName); + GetQueueUrlResult result = sqs.getQueueUrl( queueName ); return result.getQueueUrl(); - } catch (QueueDoesNotExistException e) { - logger.error("Queue {} does not exist", queueName); - throw e; - } catch (Exception e) { - logger.error("failed to get queue from service", e); + } + catch ( QueueDoesNotExistException e ) { + //no op, return null + logger.error( "Queue {} does not exist", queueName ); + return null; + } + catch ( Exception e ) { + logger.error( "failed to get queue from service", e ); throw e; } }