Repository: usergrid Updated Branches: refs/heads/master 949c2bbe0 -> 16c270774
Fix serialization issues with access tokens and simplify the Queue Manager getMessages interface. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bcc97808 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bcc97808 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bcc97808 Branch: refs/heads/master Commit: bcc97808864c2bb9de9d39b5243592b818e90d5b Parents: bfd2bb3 Author: Michael Russo <[email protected]> Authored: Mon Feb 29 18:14:27 2016 -0800 Committer: Michael Russo <[email protected]> Committed: Mon Feb 29 18:14:27 2016 -0800 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 7 ++----- .../usergrid/persistence/queue/LocalQueueManager.java | 6 +----- .../org/apache/usergrid/persistence/queue/QueueFig.java | 4 ++-- .../apache/usergrid/persistence/queue/QueueManager.java | 6 +----- .../persistence/queue/impl/SNSQueueManagerImpl.java | 12 +++++++----- .../usergrid/persistence/queue/QueueManagerTest.java | 10 +++++----- .../shiro/credentials/ApplicationAccessToken.java | 3 +++ .../shiro/credentials/OrganizationAccessToken.java | 3 +++ .../usergrid/services/notifications/QueueListener.java | 2 +- .../usergrid/services/queues/ImportQueueManager.java | 4 +--- .../apache/usergrid/services/queues/QueueListener.java | 2 +- 11 files changed, 27 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 782bed7..3f623d8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -242,13 +242,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { final Timer.Context timer = this.readTimer.time(); try { - return queue.getMessages(MAX_TAKE, - indexProcessorFig.getIndexQueueVisibilityTimeout(), - Math.max(1000, queueFig.getQueueClientSocketTimeout() - 1000), // 1 sec less than socket timeout - AsyncEvent.class); + return queue.getMessages(MAX_TAKE, AsyncEvent.class); } - //stop our timer finally { + //stop our timer timer.stop(); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java index 4d26100..1f4261a 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java @@ -22,17 +22,13 @@ package org.apache.usergrid.persistence.queue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; import java.io.IOException; -import java.util.AbstractQueue; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; /** @@ -45,7 +41,7 @@ public class LocalQueueManager implements QueueManager { public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); @Override - public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) { + public List<QueueMessage> getMessages(int limit, Class klass) { List<QueueMessage> returnQueue = new ArrayList<>(); try { QueueMessage message=null; http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index 7757d58..63d2d80 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -88,11 +88,11 @@ public interface QueueFig extends GuicyFig { int getLocalQuorumTimeout(); @Key( "usergrid.queue.client.connection.timeout") - @Default( "1000" ) // 3 seconds + @Default( "1000" ) // 1 second int getQueueClientConnectionTimeout(); @Key( "usergrid.queue.client.socket.timeout") - @Default( "3000" ) // 3 seconds + @Default( "10000" ) // 10 seconds int getQueueClientSocketTimeout(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java index 4c948e3..18909e4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java @@ -17,8 +17,6 @@ */ package org.apache.usergrid.persistence.queue; -import rx.Observable; - import java.io.IOException; import java.io.Serializable; import java.util.List; @@ -31,12 +29,10 @@ public interface QueueManager { /** * Read messages from queue * @param limit - * @param transactionTimeout timeout in seconds - * @param waitTime wait time for next message in milliseconds * @param klass class to cast the return from * @return List of Queue Messages */ - List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass); + List<QueueMessage> getMessages(int limit, Class klass); /** * get the queue depth http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index f1d8c5a..4dd9bda 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -169,7 +169,8 @@ public class SNSQueueManagerImpl implements QueueManager { this.clientConfiguration = new ClientConfiguration() .withConnectionTimeout(queueFig.getQueueClientConnectionTimeout()) - .withSocketTimeout(queueFig.getQueueClientSocketTimeout()) + // don't let the socket timeout be configured less than 5 sec (network delays do happen) + .withSocketTimeout(Math.max(5000, queueFig.getQueueClientSocketTimeout())) .withGzip(true); try { @@ -411,8 +412,7 @@ public class SNSQueueManagerImpl implements QueueManager { @Override - public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime, - final Class klass ) { + public List<QueueMessage> getMessages(final int limit, final Class klass) { if ( sqs == null ) { logger.error( "SQS is null - was not initialized properly" ); @@ -427,8 +427,10 @@ public class SNSQueueManagerImpl implements QueueManager { ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url ); receiveMessageRequest.setMaxNumberOfMessages( limit ); - receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) ); - receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 ); + receiveMessageRequest.setVisibilityTimeout( Math.max( 1, fig.getVisibilityTimeout() / 1000 ) ); + + // set SQS long polling to 3 secs < the client socket timeout (network delays) with min of 0 (no long poll) + receiveMessageRequest.setWaitTimeSeconds( Math.max(0, ( fig.getQueueClientSocketTimeout() - 3000) / 1000 ) ); try { ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java index c8661c0..d57beab 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java @@ -82,14 +82,14 @@ public class QueueManagerTest { public void send() throws Exception{ String value = "bodytest"; qm.sendMessage(value); - List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class); + List<QueueMessage> messageList = qm.getMessages(1, String.class); assertTrue(messageList.size() >= 1); for(QueueMessage message : messageList){ assertTrue(message.getBody().equals(value)); qm.commitMessage(message); } - messageList = qm.getMessages(1,5000,5000,String.class); + messageList = qm.getMessages(1, String.class); assertTrue(messageList.size() <= 0); } @@ -102,14 +102,14 @@ public class QueueManagerTest { List<Map<String,String>> bodies = new ArrayList<>(); bodies.add(values); qm.sendMessages(bodies); - List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()); + List<QueueMessage> messageList = qm.getMessages(1, values.getClass()); assertTrue(messageList.size() >= 1); for(QueueMessage message : messageList){ assertTrue(message.getBody().equals(values)); } qm.commitMessages(messageList); - messageList = qm.getMessages(1,5000,5000,values.getClass()); + messageList = qm.getMessages(1, values.getClass()); assertTrue(messageList.size() <= 0); } @@ -133,7 +133,7 @@ public class QueueManagerTest { } assertTrue(depth>0); - List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()); + List<QueueMessage> messageList = qm.getMessages(10, values.getClass()); assertTrue(messageList.size() <= 500); for(QueueMessage message : messageList){ assertTrue(message.getBody().equals(values)); http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java index cbfe97d..91d95d2 100644 --- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java +++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/ApplicationAccessToken.java @@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials; public class ApplicationAccessToken extends AbstractAccessTokenCredentials implements ApplicationCredentials { + // Do not remove, needed for Jackson to handle deserialization + public ApplicationAccessToken(){} + public ApplicationAccessToken( String token ) { super( token ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java index 3ae6f2c..36e78c6 100644 --- a/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java +++ b/stack/services/src/main/java/org/apache/usergrid/security/shiro/credentials/OrganizationAccessToken.java @@ -19,6 +19,9 @@ package org.apache.usergrid.security.shiro.credentials; public class OrganizationAccessToken extends AbstractAccessTokenCredentials implements OrganizationCredentials { + // Do not remove, needed for Jackson to handle deserialization + public OrganizationAccessToken(){} + public OrganizationAccessToken( String token ) { super( token ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index 0a0e982..de9cf06 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -169,7 +169,7 @@ public class QueueListener { while ( true ) { Timer.Context timerContext = timer.time(); - rx.Observable.from(queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class)) + rx.Observable.from(queueManager.getMessages(getBatchSize(), ApplicationQueueMessage.class)) .buffer(getBatchSize()) .doOnNext(messages -> { http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index 272bb65..f3c65c7 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueMessage; -import rx.Observable; /** @@ -37,8 +36,7 @@ import rx.Observable; public class ImportQueueManager implements QueueManager { @Override - public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime, - final Class klass ) { + public List<QueueMessage> getMessages(final int limit, final Class klass) { return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/bcc97808/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java index 7ceb2ae..5895d38 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java @@ -181,7 +181,7 @@ public abstract class QueueListener { Timer.Context timerContext = timer.time(); //Get the messages out of the queue. //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here. - rx.Observable.from( queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class)) + rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class)) .buffer(getBatchSize()) .doOnNext(messages -> { try {
