Repository: incubator-usergrid Updated Branches: refs/heads/sqs_queues 9ee998bc2 -> bd4d18b40
better logging Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bd4d18b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bd4d18b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bd4d18b4 Branch: refs/heads/sqs_queues Commit: bd4d18b4050e6c269cc65ce92d653be24c971dd7 Parents: 9ee998b Author: Shawn Feldman <[email protected]> Authored: Mon Oct 6 11:16:08 2014 -0600 Committer: Shawn Feldman <[email protected]> Committed: Mon Oct 6 11:16:08 2014 -0600 ---------------------------------------------------------------------- .../queue/impl/SQSQueueManagerImpl.java | 23 +++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bd4d18b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java index 9d480ef..cf6ff45 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java @@ -60,10 +60,13 @@ public class SQSQueueManagerImpl implements QueueManager { public Queue createQueue(){ + String name = getName(); CreateQueueRequest createQueueRequest = new CreateQueueRequest() - .withQueueName(getName()); + .withQueueName(name); CreateQueueResult result = sqs.createQueue(createQueueRequest); - return new Queue(result.getQueueUrl()); + String url = result.getQueueUrl(); + LOG.info("Created queue with url {}",url); + return new Queue(url); } private String getName() { @@ -73,7 +76,8 @@ public class SQSQueueManagerImpl implements QueueManager { public Queue getQueue(){ if(queue == null) { - for (String queueUrl : sqs.listQueues().getQueueUrls()) { + ListQueuesResult result = sqs.listQueues(); + for (String queueUrl : result.getQueueUrls()) { boolean found = queueUrl.contains(getName()); if (found) { queue = new Queue(queueUrl); @@ -137,18 +141,27 @@ public class SQSQueueManagerImpl implements QueueManager { } public void commitMessage( QueueMessage queueMessage){ + String url = getQueue().getUrl(); + LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url); + sqs.deleteMessage(new DeleteMessageRequest() - .withQueueUrl(getQueue().getUrl()) + .withQueueUrl(url) .withReceiptHandle(queueMessage.getHandle())); } public void commitMessages( List<QueueMessage> queueMessages){ + String url = getQueue().getUrl(); + LOG.info("Commit messages {} to queue {}",queueMessages.size(),url); List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); for(QueueMessage message : queueMessages){ entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle())); } - DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(getQueue().getUrl(),entries); + DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries); DeleteMessageBatchResult result = sqs.deleteMessageBatch(request); + boolean successful = result.getFailed().size() > 0; + if(!successful){ + LOG.error("Commit failed {} messages",result.getFailed().size()); + } } /** Read the object from Base64 string. */
