Reformatted for readability, small changes to support full impl of async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cb287ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cb287ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cb287ac Branch: refs/heads/USERGRID-669 Commit: 8cb287ac6d43acab4d68506b35d4a7725ff344d0 Parents: 7298853 Author: Jeff West <jw...@apigee.com> Authored: Tue May 26 09:08:10 2015 -0700 Committer: Jeff West <jw...@apigee.com> Committed: Tue May 26 09:08:10 2015 -0700 ---------------------------------------------------------------------- .../queue/impl/SQSQueueManagerImpl.java | 239 ++++++++++++------- 1 file changed, 151 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cb287ac/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index 088359a..e28e805 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -19,11 +19,10 @@ package org.apache.usergrid.persistence.queue.impl; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ExecutionException; +import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,69 +61,100 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; public class SQSQueueManagerImpl implements QueueManager { - private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class); + private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class); - private final QueueScope scope; - private ObjectMapper mapper; - private final QueueFig fig; - private final AmazonSQSClient sqs; + private final QueueScope scope; + private ObjectMapper mapper; + protected final QueueFig fig; + protected final AmazonSQSClient sqs; private static SmileFactory smileFactory = new SmileFactory(); private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder() - .maximumSize( 1000 ) - .build( new CacheLoader<String, Queue>() { - @Override - public Queue load( String queueName ) throws Exception { - - //the amazon client is not thread safe, we need to create one per queue - Queue queue = null; - try { - GetQueueUrlResult result = sqs.getQueueUrl( queueName ); - queue = new Queue( result.getQueueUrl() ); - }catch ( QueueDoesNotExistException queueDoesNotExistException ) { - //no op, swallow - LOG.error( "Queue {} does not exist, creating", queueName ); - - } - catch ( Exception e ) { - LOG.error( "failed to get queue from service", e ); - throw e; - } - if ( queue == null ) { - CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueName ); - CreateQueueResult result = sqs.createQueue( createQueueRequest ); - String url = result.getQueueUrl(); - queue = new Queue( url ); - LOG.info( "Created queue with url {}", url ); - } - return queue; + .maximumSize(1000) + .build(new CacheLoader<String, Queue>() { + @Override + public Queue load(String queueName) throws Exception { + + //the amazon client is not thread safe, we need to create one per queue + Queue queue = null; + + try { + + GetQueueUrlResult result = sqs.getQueueUrl(queueName); + queue = new Queue(result.getQueueUrl()); + + } catch (QueueDoesNotExistException queueDoesNotExistException) { + //no op, swallow + logger.error("Queue {} does not exist, creating", queueName); + + } catch (Exception e) { + logger.error("failed to get queue from service", e); + throw e; + } + + if (queue == null) { + + final String deadletterQueueName = String.format("%s_dead", queueName); + final Map<String, String> deadLetterAttributes = new HashMap<>(2); + + deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod()); + CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest() + .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes); + + final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest); + logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl()); + + final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(deadletterQueueName, sqs); + + String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," + + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn); + + final Map<String, String> queueAttributes = new HashMap<>(2); + deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod()); + deadLetterAttributes.put("RedrivePolicy", redrivePolicy); + + CreateQueueRequest createQueueRequest = new CreateQueueRequest(). + withQueueName(queueName) + .withAttributes(queueAttributes); + + CreateQueueResult result = sqs.createQueue(createQueueRequest); + + String url = result.getQueueUrl(); + queue = new Queue(url); + + logger.info("Created queue with url {}", url); } - } ); + + return queue; + } + }); @Inject - public SQSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig ){ + public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig) { + this.scope = scope; this.fig = fig; try { smileFactory.delegateToTextual(true); - mapper = new ObjectMapper( smileFactory ); + mapper = new ObjectMapper(smileFactory); //pretty print, disabling for speed // mapper.enable(SerializationFeature.INDENT_OUTPUT); mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class"); sqs = createClient(); - } catch ( Exception e ) { + } catch (Exception e) { throw new RuntimeException("Error setting up mapper", e); } } - private String getName() { + protected String getName() { + String name = fig.getPrefix() + "_" + scope.getName(); Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters"); @@ -133,6 +163,7 @@ public class SQSQueueManagerImpl implements QueueManager { } public Queue getQueue() { + try { Queue queue = urlMap.get(getName()); return queue; @@ -142,67 +173,86 @@ public class SQSQueueManagerImpl implements QueueManager { } @Override - public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { - if(sqs == null){ - LOG.error("Sqs is null"); + public List<QueueMessage> getMessages(final int limit, + final int transactionTimeout, + final int waitTime, + final Class klass) { + + if (sqs == null) { + logger.error("Sqs is null"); return new ArrayList<>(); } - waitTime = waitTime/1000; + String url = getQueue().getUrl(); - LOG.debug( "Getting {} messages from {}", limit, url); + + if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url); + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url); receiveMessageRequest.setMaxNumberOfMessages(limit); - receiveMessageRequest.setVisibilityTimeout(transactionTimeout); - receiveMessageRequest.setWaitTimeSeconds(waitTime); + receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000); + receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000); ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest); List<Message> messages = result.getMessages(); - LOG.debug( "Received {} messages from {}", messages.size(), url); + + if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url); + List<QueueMessage> queueMessages = new ArrayList<>(messages.size()); + for (Message message : messages) { - Object body ; - try{ - body = fromString(message.getBody(),klass); - }catch (Exception e){ - LOG.error("failed to deserialize message", e); + Object body; + + try { + body = fromString(message.getBody(), klass); + } catch (Exception e) { + logger.error("failed to deserialize message", e); throw new RuntimeException(e); } - QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body,message.getAttributes().get( "type" )); + + QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type")); queueMessages.add(queueMessage); } + return queueMessages; } @Override - public void sendMessages(List bodies) throws IOException { - if(sqs == null){ - LOG.error("Sqs is null"); + public void sendMessages(final List bodies) throws IOException { + + if (sqs == null) { + logger.error("Sqs is null"); return; } String url = getQueue().getUrl(); - LOG.debug( "Sending Messages...{} to {}", bodies.size(), url); + + if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url); SendMessageBatchRequest request = new SendMessageBatchRequest(url); List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size()); - for(Object body : bodies){ + + for (Object body : bodies) { SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry(); entry.setId(UUID.randomUUID().toString()); - entry.setMessageBody( toString( body ) ); - entry.addMessageAttributesEntry( "type",new MessageAttributeValue().withStringValue( "mytype" ) ); + entry.setMessageBody(toString(body)); + entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype")); entries.add(entry); } + request.setEntries(entries); sqs.sendMessageBatch(request); } @Override - public void sendMessage(Object body) throws IOException { - if(sqs == null){ - LOG.error("Sqs is null"); + public void sendMessage(final Object body) throws IOException { + + if (sqs == null) { + logger.error("Sqs is null"); return; } + String url = getQueue().getUrl(); - LOG.debug( "Sending Message...{} to {}", body.toString(), url); + + if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url); final String stringBody = toString(body); @@ -212,55 +262,68 @@ public class SQSQueueManagerImpl implements QueueManager { @Override - public void commitMessage(QueueMessage queueMessage) { + public void commitMessage(final QueueMessage queueMessage) { + String url = getQueue().getUrl(); - LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url); + if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url); sqs.deleteMessage(new DeleteMessageRequest() - .withQueueUrl(url) - .withReceiptHandle(queueMessage.getHandle())); + .withQueueUrl(url) + .withReceiptHandle(queueMessage.getHandle())); } @Override - public void commitMessages(List<QueueMessage> queueMessages) { + public void commitMessages(final List<QueueMessage> queueMessages) { + String url = getQueue().getUrl(); - LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url); + if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url); + List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); - for(QueueMessage message : queueMessages){ - entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle())); + + for (QueueMessage message : queueMessages) { + entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle())); } - DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries); + + DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries); DeleteMessageBatchResult result = sqs.deleteMessageBatch(request); + boolean successful = result.getFailed().size() <= 0; - if(!successful){ - for( BatchResultErrorEntry failed : result.getFailed()) { - LOG.error("Commit failed reason: {} messages id: {}", failed.getMessage(),failed.getId()); + + if (!successful) { + + for (BatchResultErrorEntry failed : result.getFailed()) { + logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId()); } } } - - /** Read the object from Base64 string. */ - private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException { - Object o = mapper.readValue(s,klass); + /** + * Read the object from Base64 string. + */ + private Object fromString(final String s, + final Class klass) throws IOException, ClassNotFoundException { + Object o = mapper.readValue(s, klass); return o; } - /** Write the object to a Base64 string. */ - private String toString( Object o ) throws IOException { + /** + * Write the object to a Base64 string. + */ + protected String toString(final Object o) throws IOException { return mapper.writeValueAsString(o); } /** * Get the region + * * @return */ - private Region getRegion() { - Regions regions = Regions.fromName( fig.getRegion() ); - Region region = Region.getRegion( regions ); + protected Region getRegion() { + Regions regions = Regions.fromName(fig.getRegion()); + Region region = Region.getRegion(regions); return region; } @@ -270,9 +333,9 @@ public class SQSQueueManagerImpl implements QueueManager { */ private AmazonSQSClient createClient() { final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() ); + final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials()); final Region region = getRegion(); - sqs.setRegion( region ); + sqs.setRegion(region); return sqs; }