Repository: usergrid Updated Branches: refs/heads/master e7c805f1e -> a679aea4a
Update the visibility timeout used when taking messages from SQS as it overrides the queue's configured/default timeout. Also change the queue's visibility timeout property to be exposed as milliseconds for consistency. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2b22c610 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2b22c610 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2b22c610 Branch: refs/heads/master Commit: 2b22c610ddc3b9f9f08777fe5467713efe2ba553 Parents: 0f589f6 Author: Michael Russo <[email protected]> Authored: Thu Oct 15 10:23:29 2015 -0700 Committer: Michael Russo <[email protected]> Committed: Thu Oct 15 10:23:29 2015 -0700 ---------------------------------------------------------------------- .../usergrid/corepersistence/index/IndexProcessorFig.java | 7 +++---- .../java/org/apache/usergrid/persistence/queue/QueueFig.java | 6 +++--- .../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java | 2 +- .../persistence/queue/util/AmazonNotificationUtils.java | 4 +++- 4 files changed, 10 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b22c610/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java index 410f162..7d022e5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java @@ -62,11 +62,10 @@ public interface IndexProcessorFig extends GuicyFig { /** * Set the visibility timeout for messages received from the queue. (in milliseconds). - * AWS default is also currently 30 seconds. Received messages will remain 'in flight' until - * they are ack'd(deleted) or this timeout occurs. If the timeout occurs, the messages will become - * visible again for re-processing. + * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs. + * If the timeout occurs, the messages will become visible again for re-processing. */ - @Default( "30000" ) + @Default( "5000" ) // 5 seconds @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT ) int getIndexQueueVisibilityTimeout(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b22c610/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 7f26bcf..0453a9b 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -76,10 +76,10 @@ public interface QueueFig extends GuicyFig { int getAsyncQueueSize(); /** - * Set the visibility timeout for faster retries + * Set the visibility timeout (in milliseconds) for faster retries * @return */ @Key( "usergrid.queue.visibilityTimeout" ) - @Default("10") - String getVisibilityTimeout(); + @Default("5000") // 5 seconds + int getVisibilityTimeout(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b22c610/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index a2b5d72..d476f76 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -351,7 +351,7 @@ public class SNSQueueManagerImpl implements QueueManager { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url); receiveMessageRequest.setMaxNumberOfMessages(limit); - receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000); + receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000)); receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000); try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/2b22c610/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java index 6105592..d57870d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java @@ -61,10 +61,12 @@ public class AmazonNotificationUtils { .format( "{\"maxReceiveCount\":\"%s\"," + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn ); + final String visibilityTimeoutInSeconds = String.valueOf(Math.max(1, fig.getVisibilityTimeout() / 1000)); + final Map<String, String> queueAttributes = new HashMap<>( 2 ); queueAttributes.put( "MessageRetentionPeriod", fig.getRetentionPeriod() ); queueAttributes.put( "RedrivePolicy", redrivePolicy ); - queueAttributes.put( "VisibilityTimeout", fig.getVisibilityTimeout() ); + queueAttributes.put( "VisibilityTimeout", visibilityTimeoutInSeconds ); CreateQueueRequest createQueueRequest = new CreateQueueRequest(). withQueueName( queueName )
