Move under org.apache.usergrid.persistence package in preparation for integration into Usergrid.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9016fd29 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9016fd29 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9016fd29 Branch: refs/heads/usergrid-1318-queue Commit: 9016fd290e80b88ba3199c0020fca7722b82d780 Parents: 1574b3a Author: Dave Johnson <snoopd...@apache.org> Authored: Fri Sep 9 17:31:44 2016 -0400 Committer: Dave Johnson <snoopd...@apache.org> Committed: Fri Sep 9 17:31:44 2016 -0400 ---------------------------------------------------------------------- .../asyncevents/AsyncEventServiceImpl.java | 58 +++---- .../asyncevents/AsyncIndexProvider.java | 12 +- .../index/AsyncEventServiceImplTest.java | 8 +- .../usergrid/mq/LegacyQueuePathsTest.java | 48 ++++++ .../org/apache/usergrid/mq/QueuePathsTest.java | 48 ------ .../usergrid/persistence/queue/LegacyQueue.java | 35 +++++ .../persistence/queue/LegacyQueueFig.java | 106 +++++++++++++ .../persistence/queue/LegacyQueueManager.java | 79 ++++++++++ .../queue/LegacyQueueManagerFactory.java | 23 +++ .../LegacyQueueManagerInternalFactory.java | 28 ++++ .../persistence/queue/LegacyQueueMessage.java | 70 +++++++++ .../persistence/queue/LegacyQueueScope.java | 45 ++++++ .../persistence/queue/LocalQueueManager.java | 18 +-- .../usergrid/persistence/queue/Queue.java | 35 ----- .../usergrid/persistence/queue/QueueFig.java | 106 ------------- .../persistence/queue/QueueManager.java | 79 ---------- .../persistence/queue/QueueManagerFactory.java | 23 --- .../queue/QueueManagerInternalFactory.java | 28 ---- .../persistence/queue/QueueMessage.java | 70 --------- .../usergrid/persistence/queue/QueueScope.java | 45 ------ .../persistence/queue/guice/QueueModule.java | 16 +- .../queue/impl/LegacyQueueScopeImpl.java | 67 ++++++++ .../queue/impl/QueueManagerFactoryImpl.java | 20 +-- .../persistence/queue/impl/QueueScopeImpl.java | 68 -------- .../queue/impl/SNSQueueManagerImpl.java | 48 +++--- .../queue/util/AmazonNotificationUtils.java | 4 +- .../queue/LegacyQueueManagerTest.java | 156 +++++++++++++++++++ .../persistence/queue/QueueManagerTest.java | 156 ------------------- .../notifications/ApplicationQueueManager.java | 4 +- .../ApplicationQueueManagerCache.java | 6 +- .../notifications/NotificationsService.java | 19 ++- .../services/notifications/QueueJob.java | 1 - .../services/notifications/QueueListener.java | 29 ++-- .../impl/ApplicationQueueManagerImpl.java | 18 +-- .../services/queues/ImportQueueListener.java | 8 +- .../services/queues/ImportQueueManager.java | 12 +- .../usergrid/services/queues/QueueListener.java | 18 +-- 37 files changed, 803 insertions(+), 811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 0bff887..6add88c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -57,12 +57,12 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueScope; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; @@ -104,9 +104,9 @@ public class AsyncEventServiceImpl implements AsyncEventService { public int MAX_TAKE = 10; public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars - private final QueueManager queue; + private final LegacyQueueManager queue; private final IndexProcessorFig indexProcessorFig; - private final QueueFig queueFig; + private final LegacyQueueFig queueFig; private final IndexProducer indexProducer; private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory; @@ -134,7 +134,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Inject - public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory, + public AsyncEventServiceImpl(final LegacyQueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig, final IndexProducer indexProducer, final MetricsFactory metricsFactory, @@ -143,7 +143,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { final EntityIndexFactory entityIndexFactory, final EventBuilder eventBuilder, final MapManagerFactory mapManagerFactory, - final QueueFig queueFig, + final LegacyQueueFig queueFig, @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler ) { this.indexProducer = indexProducer; @@ -159,7 +159,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { this.rxTaskScheduler = rxTaskScheduler; - QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); + LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL); this.queue = queueManagerFactory.getQueueManager(queueScope); this.indexProcessorFig = indexProcessorFig; @@ -233,7 +233,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** * Take message from SQS */ - private List<QueueMessage> take() { + private List<LegacyQueueMessage> take() { final Timer.Context timer = this.readTimer.time(); @@ -251,7 +251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** * Ack message in SQS */ - public void ack(final List<QueueMessage> messages) { + public void ack(final List<LegacyQueueMessage> messages) { final Timer.Context timer = this.ackTimer.time(); @@ -275,7 +275,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { * @param messages * @return */ - private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) { + private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) { if (logger.isDebugEnabled()) { logger.debug("callEventHandlers with {} message(s)", messages.size()); @@ -422,7 +422,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) { + private IndexOperationMessage handleEntityIndexUpdate(final LegacyQueueMessage message) { Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); @@ -457,7 +457,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private IndexOperationMessage handleEdgeIndex(final QueueMessage message) { + private IndexOperationMessage handleEdgeIndex(final LegacyQueueMessage message) { Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" ); @@ -486,7 +486,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } - private IndexOperationMessage handleEdgeDelete(final QueueMessage message) { + private IndexOperationMessage handleEdgeDelete(final LegacyQueueMessage message) { Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); @@ -668,7 +668,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } - private IndexOperationMessage handleEntityDelete(final QueueMessage message) { + private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); @@ -700,7 +700,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } - private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { + private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); @@ -741,15 +741,15 @@ public class AsyncEventServiceImpl implements AsyncEventService { private void startWorker() { synchronized (mutex) { - Observable<List<QueueMessage>> consumer = - Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() { + Observable<List<LegacyQueueMessage>> consumer = + Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() { @Override - public void call( final Subscriber<? super List<QueueMessage>> subscriber ) { + public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) { //name our thread so it's easy to see Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() ); - List<QueueMessage> drainList = null; + List<LegacyQueueMessage> drainList = null; do { try { @@ -799,7 +799,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { List<IndexEventResult> indexEventResults = callEventHandlers( messages ); // submit the processed messages to index producer - List<QueueMessage> messagesToAck = submitToIndex( indexEventResults ); + List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults ); if ( messagesToAck.size() < messages.size() ) { logger.warn( "Missing {} message(s) from index processing", @@ -834,7 +834,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { * Submit results to index and return the queue messages to be ack'd * */ - private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { + private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) { // if nothing came back then return empty list if(indexEventResults==null){ @@ -842,7 +842,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { } IndexOperationMessage combined = new IndexOperationMessage(); - List<QueueMessage> queueMessages = indexEventResults.stream() + List<LegacyQueueMessage> queueMessages = indexEventResults.stream() // filter out messages that are not present, they were not processed and put into the results .filter( result -> result.getQueueMessage().isPresent() ) @@ -898,10 +898,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { public class IndexEventResult{ private final Optional<IndexOperationMessage> indexOperationMessage; - private final Optional<QueueMessage> queueMessage; + private final Optional<LegacyQueueMessage> queueMessage; private final long creationTime; - public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){ + public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<LegacyQueueMessage> queueMessage, long creationTime){ this.queueMessage = queueMessage; this.creationTime = creationTime; @@ -912,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { return indexOperationMessage; } - public Optional<QueueMessage> getQueueMessage() { + public Optional<LegacyQueueMessage> getQueueMessage() { return queueMessage; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index abd4ce1..aac0e66 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -29,8 +29,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.queue.LocalQueueManager; import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; import com.google.inject.Inject; import com.google.inject.Provider; @@ -45,7 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final IndexProcessorFig indexProcessorFig; - private final QueueManagerFactory queueManagerFactory; + private final LegacyQueueManagerFactory queueManagerFactory; private final MetricsFactory metricsFactory; private final RxTaskScheduler rxTaskScheduler; private final EntityCollectionManagerFactory entityCollectionManagerFactory; @@ -54,14 +54,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private final EntityIndexFactory entityIndexFactory; private final IndexProducer indexProducer; private final MapManagerFactory mapManagerFactory; - private final QueueFig queueFig; + private final LegacyQueueFig queueFig; private AsyncEventService asyncEventService; @Inject public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig, - final QueueManagerFactory queueManagerFactory, + final LegacyQueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler, final EntityCollectionManagerFactory entityCollectionManagerFactory, @@ -70,7 +70,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer, final MapManagerFactory mapManagerFactory, - final QueueFig queueFig) { + final LegacyQueueFig queueFig) { this.indexProcessorFig = indexProcessorFig; this.queueManagerFactory = queueManagerFactory; http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java index 4c0058b..92b5983 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java @@ -34,8 +34,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.EsRunner; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.map.MapManagerFactory; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; import org.junit.Rule; import org.junit.runner.RunWith; @@ -53,13 +53,13 @@ public class AsyncEventServiceImplTest extends AsyncIndexServiceTest { @Inject - public QueueManagerFactory queueManagerFactory; + public LegacyQueueManagerFactory queueManagerFactory; @Inject public IndexProcessorFig indexProcessorFig; @Inject - public QueueFig queueFig; + public LegacyQueueFig queueFig; @Inject http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java new file mode 100644 index 0000000..5ffa553 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.usergrid.mq; + + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.usergrid.mq.Queue.getQueueParentPaths; +import static org.apache.usergrid.mq.Queue.normalizeQueuePath; +import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString; + + + +public class LegacyQueuePathsTest { + private static final Logger logger = LoggerFactory.getLogger( LegacyQueuePathsTest.class ); + + + @Test + // TODO - why does this test case not have assertions to test results? + // tests should not be written like this: what's the point? If it's + // code coverage this is still bad. + public void testPaths() throws Exception { + logger.info( normalizeQueuePath( "a/b/c" ) ); + logger.info( normalizeQueuePath( "a/b/c/" ) ); + logger.info( normalizeQueuePath( "/a/b/c" ) ); + logger.info( normalizeQueuePath( "/////a/b/c" ) ); + logger.info( normalizeQueuePath( "/" ) ); + + logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) ); + logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) ); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java deleted file mode 100644 index 86dc8bc..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.mq; - - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.usergrid.mq.Queue.getQueueParentPaths; -import static org.apache.usergrid.mq.Queue.normalizeQueuePath; -import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString; - - - -public class QueuePathsTest { - private static final Logger logger = LoggerFactory.getLogger( QueuePathsTest.class ); - - - @Test - // TODO - why does this test case not have assertions to test results? - // tests should not be written like this: what's the point? If it's - // code coverage this is still bad. - public void testPaths() throws Exception { - logger.info( normalizeQueuePath( "a/b/c" ) ); - logger.info( normalizeQueuePath( "a/b/c/" ) ); - logger.info( normalizeQueuePath( "/a/b/c" ) ); - logger.info( normalizeQueuePath( "/////a/b/c" ) ); - logger.info( normalizeQueuePath( "/" ) ); - - logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) ); - logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) ); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java new file mode 100644 index 0000000..c12dc41 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue; + + +public class LegacyQueue { + private final String url; + + public LegacyQueue(String url) { + this.url = url; + } + + public String getUrl(){ + return url; + } + + public boolean isEmpty(){ + return url == null; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java new file mode 100644 index 0000000..907eec2 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java @@ -0,0 +1,106 @@ +package org.apache.usergrid.persistence.queue; + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.FigSingleton; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + +@FigSingleton +public interface LegacyQueueFig extends GuicyFig { + + /** + * Any region value string must exactly match the region names specified on this page: + * + * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html* + */ + + + /** + * Primary region to use for Amazon queues. + */ + @Key( "usergrid.queue.region" ) + @Default("us-east-1") + String getPrimaryRegion(); + + /** + * Flag to determine if Usergrid should use a multi-region Amazon queue + * implementation. + */ + @Key( "usergrid.queue.multiregion" ) + @Default("false") + boolean isMultiRegion(); + + /** + * Comma-separated list of one or more Amazon regions to use if multiregion + * is set to true. + */ + @Key( "usergrid.queue.regionList" ) + @Default("us-east-1") + String getRegionList(); + + + /** + * Set the amount of time (in minutes) to retain messages in a queue. + * 1209600 = 14 days (maximum retention period) + */ + @Key( "usergrid.queue.retention" ) + @Default("1209600") + String getRetentionPeriod(); + + /** + * Set the amount of time (in minutes) to retain messages in a dead letter queue. + * 1209600 = 14 days (maximum retention period) + */ + @Key( "usergrid.queue.deadletter.retention" ) + @Default("1209600") + String getDeadletterRetentionPeriod(); + + /** + * The maximum number of attempts to attempt to deliver before failing into the DLQ + */ + @Key( "usergrid.queue.deliveryLimit" ) + @Default("10") + String getQueueDeliveryLimit(); + + @Key("usergrid.use.default.queue") + @Default("false") + boolean overrideQueueForDefault(); + + @Key("usergrid.queue.publish.threads") + @Default("100") + int getAsyncMaxThreads(); + + // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap) + @Key("usergrid.queue.publish.queuesize") + @Default("250000") + int getAsyncQueueSize(); + + /** + * Set the visibility timeout (in milliseconds) for faster retries + * @return + */ + @Key( "usergrid.queue.visibilityTimeout" ) + @Default("5000") // 5 seconds + int getVisibilityTimeout(); + + @Key( "usergrid.queue.localquorum.timeout") + @Default("30000") // 30 seconds + int getLocalQuorumTimeout(); + + @Key( "usergrid.queue.client.connection.timeout") + @Default( "5000" ) // 5 seconds + int getQueueClientConnectionTimeout(); + + @Key( "usergrid.queue.client.socket.timeout") + @Default( "50000" ) // 50 seconds + int getQueueClientSocketTimeout(); + + @Key( "usergrid.queue.poll.timeout") + @Default( "10000" ) // 10 seconds + int getQueuePollTimeout(); + + @Key( "usergrid.queue.quorum.fallback") + @Default("false") // 30 seconds + boolean getQuorumFallback(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java new file mode 100644 index 0000000..053dd36 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/**ctor + * Manages queues for usergrid. Current implementation is sqs based. + */ +public interface LegacyQueueManager { + + /** + * Read messages from queue + * @param limit + * @param klass class to cast the return from + * @return List of Queue Messages + */ + List<LegacyQueueMessage> getMessages(int limit, Class klass); + + /** + * get the queue depth + * @return + */ + long getQueueDepth(); + + /** + * Commit the transaction + * @param queueMessage + */ + void commitMessage( LegacyQueueMessage queueMessage); + + /** + * commit multiple messages + * @param queueMessages + */ + void commitMessages( List<LegacyQueueMessage> queueMessages); + + /** + * send messages to queue + * @param bodies body objects must be serializable + * @throws IOException + */ + void sendMessages(List bodies) throws IOException; + + /** + * send a message to queue + * @param body + * @throws IOException + */ + <T extends Serializable> void sendMessage(T body)throws IOException; + + /** + * Send a messae to the topic to be sent to other queues + * @param body + */ + <T extends Serializable> void sendMessageToTopic(T body) throws IOException; + + /** + * purge messages + */ + void deleteQueue(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java new file mode 100644 index 0000000..53afc22 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue; + +public interface LegacyQueueManagerFactory { + public LegacyQueueManager getQueueManager(final LegacyQueueScope scope ); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java new file mode 100644 index 0000000..986f211 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java @@ -0,0 +1,28 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. The ASF licenses this file to You + * * under the Apache License, Version 2.0 (the "License"); you may not + * * use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. For additional information regarding + * * copyright in this work, please see the NOTICE file in the top level + * * directory of this distribution. + * + */ +package org.apache.usergrid.persistence.queue; + +/** + * QueueManagerInternal guice factory + */ +public interface LegacyQueueManagerInternalFactory { + LegacyQueueManager getQueueManager(final LegacyQueueScope scope ); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java new file mode 100644 index 0000000..939443d --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue; + +public class LegacyQueueMessage { + private final Object body; + private final String messageId; + private final String handle; + private final String type; + private String stringBody; + private int receiveCount; + + + public LegacyQueueMessage(String messageId, String handle, Object body, String type) { + this.body = body; + this.messageId = messageId; + this.handle = handle; + this.type = type; + this.stringBody = ""; + this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue + } + + public String getHandle() { + return handle; + } + + public Object getBody(){ + return body; + } + + public String getMessageId() { + return messageId; + } + + + public String getType() { + return type; + } + + public void setStringBody(String stringBody) { + this.stringBody = stringBody; + } + + public String getStringBody() { + return stringBody; + } + + public void setReceiveCount(int receiveCount){ + this.receiveCount = receiveCount; + } + + public int getReceiveCount(){ + return receiveCount; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java new file mode 100644 index 0000000..1f932b2 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.persistence.queue; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; + + +public interface LegacyQueueScope { + + /** + * LOCAL will create a SNS topic with a queue subscription in a single AWS region. + * ALL will create SNS topics and queue subscriptions in ALL AWS regions. + */ + enum RegionImplementation { + LOCAL, + ALL + } + + /** + * Get the name of the the map + * @return + */ + public String getName(); + + /** + * Get the Usergrid region enum + */ + public RegionImplementation getRegionImplementation(); +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java index 1f4261a..630c2e7 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java @@ -34,17 +34,17 @@ import java.util.concurrent.TimeUnit; /** * Default queue manager implementation, uses in memory linked queue */ -public class LocalQueueManager implements QueueManager { +public class LocalQueueManager implements LegacyQueueManager { private static final Logger logger = LoggerFactory.getLogger(LocalQueueManager.class); - public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000); + public ArrayBlockingQueue<LegacyQueueMessage> queue = new ArrayBlockingQueue<>(10000); @Override - public List<QueueMessage> getMessages(int limit, Class klass) { - List<QueueMessage> returnQueue = new ArrayList<>(); + public List<LegacyQueueMessage> getMessages(int limit, Class klass) { + List<LegacyQueueMessage> returnQueue = new ArrayList<>(); try { - QueueMessage message=null; + LegacyQueueMessage message=null; int count = 5; do { message = queue.poll(100, TimeUnit.MILLISECONDS); @@ -64,11 +64,11 @@ public class LocalQueueManager implements QueueManager { } @Override - public void commitMessage(QueueMessage queueMessage) { + public void commitMessage(LegacyQueueMessage queueMessage) { } @Override - public void commitMessages(List<QueueMessage> queueMessages) { + public void commitMessages(List<LegacyQueueMessage> queueMessages) { } @Override @@ -76,7 +76,7 @@ public class LocalQueueManager implements QueueManager { for(Object body : bodies){ String uuid = UUID.randomUUID().toString(); try { - queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here")); + queue.put(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here")); }catch (InterruptedException ie){ throw new RuntimeException(ie); } @@ -88,7 +88,7 @@ public class LocalQueueManager implements QueueManager { public <T extends Serializable> void sendMessage( final T body ) throws IOException { String uuid = UUID.randomUUID().toString(); try { - queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS); + queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS); }catch (InterruptedException ie){ throw new RuntimeException(ie); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java deleted file mode 100644 index 24070d0..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue; - - -public class Queue { - private final String url; - - public Queue(String url) { - this.url = url; - } - - public String getUrl(){ - return url; - } - - public boolean isEmpty(){ - return url == null; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java deleted file mode 100644 index 74912ae..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ /dev/null @@ -1,106 +0,0 @@ -package org.apache.usergrid.persistence.queue; - -import org.safehaus.guicyfig.Default; -import org.safehaus.guicyfig.FigSingleton; -import org.safehaus.guicyfig.GuicyFig; -import org.safehaus.guicyfig.Key; - -@FigSingleton -public interface QueueFig extends GuicyFig { - - /** - * Any region value string must exactly match the region names specified on this page: - * - * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html* - */ - - - /** - * Primary region to use for Amazon queues. - */ - @Key( "usergrid.queue.region" ) - @Default("us-east-1") - String getPrimaryRegion(); - - /** - * Flag to determine if Usergrid should use a multi-region Amazon queue - * implementation. - */ - @Key( "usergrid.queue.multiregion" ) - @Default("false") - boolean isMultiRegion(); - - /** - * Comma-separated list of one or more Amazon regions to use if multiregion - * is set to true. - */ - @Key( "usergrid.queue.regionList" ) - @Default("us-east-1") - String getRegionList(); - - - /** - * Set the amount of time (in minutes) to retain messages in a queue. - * 1209600 = 14 days (maximum retention period) - */ - @Key( "usergrid.queue.retention" ) - @Default("1209600") - String getRetentionPeriod(); - - /** - * Set the amount of time (in minutes) to retain messages in a dead letter queue. - * 1209600 = 14 days (maximum retention period) - */ - @Key( "usergrid.queue.deadletter.retention" ) - @Default("1209600") - String getDeadletterRetentionPeriod(); - - /** - * The maximum number of attempts to attempt to deliver before failing into the DLQ - */ - @Key( "usergrid.queue.deliveryLimit" ) - @Default("10") - String getQueueDeliveryLimit(); - - @Key("usergrid.use.default.queue") - @Default("false") - boolean overrideQueueForDefault(); - - @Key("usergrid.queue.publish.threads") - @Default("100") - int getAsyncMaxThreads(); - - // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap) - @Key("usergrid.queue.publish.queuesize") - @Default("250000") - int getAsyncQueueSize(); - - /** - * Set the visibility timeout (in milliseconds) for faster retries - * @return - */ - @Key( "usergrid.queue.visibilityTimeout" ) - @Default("5000") // 5 seconds - int getVisibilityTimeout(); - - @Key( "usergrid.queue.localquorum.timeout") - @Default("30000") // 30 seconds - int getLocalQuorumTimeout(); - - @Key( "usergrid.queue.client.connection.timeout") - @Default( "5000" ) // 5 seconds - int getQueueClientConnectionTimeout(); - - @Key( "usergrid.queue.client.socket.timeout") - @Default( "50000" ) // 50 seconds - int getQueueClientSocketTimeout(); - - @Key( "usergrid.queue.poll.timeout") - @Default( "10000" ) // 10 seconds - int getQueuePollTimeout(); - - @Key( "usergrid.queue.quorum.fallback") - @Default("false") // 30 seconds - boolean getQuorumFallback(); - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java deleted file mode 100644 index d2e29cb..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; - -/**ctor - * Manages queues for usergrid. Current implementation is sqs based. - */ -public interface QueueManager { - - /** - * Read messages from queue - * @param limit - * @param klass class to cast the return from - * @return List of Queue Messages - */ - List<QueueMessage> getMessages(int limit, Class klass); - - /** - * get the queue depth - * @return - */ - long getQueueDepth(); - - /** - * Commit the transaction - * @param queueMessage - */ - void commitMessage( QueueMessage queueMessage); - - /** - * commit multiple messages - * @param queueMessages - */ - void commitMessages( List<QueueMessage> queueMessages); - - /** - * send messages to queue - * @param bodies body objects must be serializable - * @throws IOException - */ - void sendMessages(List bodies) throws IOException; - - /** - * send a message to queue - * @param body - * @throws IOException - */ - <T extends Serializable> void sendMessage(T body)throws IOException; - - /** - * Send a messae to the topic to be sent to other queues - * @param body - */ - <T extends Serializable> void sendMessageToTopic(T body) throws IOException; - - /** - * purge messages - */ - void deleteQueue(); -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java deleted file mode 100644 index 4cdb5e2..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue; - -public interface QueueManagerFactory { - public QueueManager getQueueManager( final QueueScope scope ); - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java deleted file mode 100644 index 119c064..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ -package org.apache.usergrid.persistence.queue; - -/** - * QueueManagerInternal guice factory - */ -public interface QueueManagerInternalFactory { - QueueManager getQueueManager( final QueueScope scope ); - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java deleted file mode 100644 index f8ce6ef..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue; - -public class QueueMessage { - private final Object body; - private final String messageId; - private final String handle; - private final String type; - private String stringBody; - private int receiveCount; - - - public QueueMessage(String messageId, String handle, Object body,String type) { - this.body = body; - this.messageId = messageId; - this.handle = handle; - this.type = type; - this.stringBody = ""; - this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue - } - - public String getHandle() { - return handle; - } - - public Object getBody(){ - return body; - } - - public String getMessageId() { - return messageId; - } - - - public String getType() { - return type; - } - - public void setStringBody(String stringBody) { - this.stringBody = stringBody; - } - - public String getStringBody() { - return stringBody; - } - - public void setReceiveCount(int receiveCount){ - this.receiveCount = receiveCount; - } - - public int getReceiveCount(){ - return receiveCount; - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java deleted file mode 100644 index f58584f..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.usergrid.persistence.queue; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; - - -public interface QueueScope { - - /** - * LOCAL will create a SNS topic with a queue subscription in a single AWS region. - * ALL will create SNS topics and queue subscriptions in ALL AWS regions. - */ - enum RegionImplementation { - LOCAL, - ALL - } - - /** - * Get the name of the the map - * @return - */ - public String getName(); - - /** - * Get the Usergrid region enum - */ - public RegionImplementation getRegionImplementation(); -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java index caf61bf..6d62da0 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java @@ -18,14 +18,14 @@ package org.apache.usergrid.persistence.queue.guice; -import org.apache.usergrid.persistence.queue.QueueManagerInternalFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory; import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl; import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; import org.safehaus.guicyfig.GuicyFigModule; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; @@ -42,11 +42,11 @@ public class QueueModule extends AbstractModule { @Override protected void configure() { - install(new GuicyFigModule(QueueFig.class)); + install(new GuicyFigModule(LegacyQueueFig.class)); - bind(QueueManagerFactory.class).to(QueueManagerFactoryImpl.class); - install(new FactoryModuleBuilder().implement(QueueManager.class, SNSQueueManagerImpl.class) - .build(QueueManagerInternalFactory.class)); + bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class); + install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class) + .build(LegacyQueueManagerInternalFactory.class)); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java new file mode 100644 index 0000000..51d6c03 --- /dev/null +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. The ASF licenses this file to You + * under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. For additional information regarding + * copyright in this work, please see the NOTICE file in the top level + * directory of this distribution. + */ +package org.apache.usergrid.persistence.queue.impl; + +import org.apache.usergrid.persistence.queue.LegacyQueueScope; + +public class LegacyQueueScopeImpl implements LegacyQueueScope { + + private final String name; + private final RegionImplementation regionImpl; + + public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) { + this.name = name; + this.regionImpl = regionImpl; + } + + + + + @Override + public String getName() { + return name; + } + + @Override + public RegionImplementation getRegionImplementation() {return regionImpl;} + + @Override + public boolean equals( final Object o ) { + if ( this == o ) { + return true; + } + if ( !( o instanceof LegacyQueueScopeImpl) ) { + return false; + } + + final LegacyQueueScopeImpl queueScope = (LegacyQueueScopeImpl) o; + + if ( !name.equals( queueScope.name ) ) { + return false; + } + + + return true; + } + + + @Override + public int hashCode() { + return name.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java index 93b2fb2..2d51903 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java @@ -33,26 +33,26 @@ import java.util.concurrent.ExecutionException; * manages whether we take in an external in memory override for queues. */ @Singleton -public class QueueManagerFactoryImpl implements QueueManagerFactory { +public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory { private static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class ); - private final QueueFig queueFig; - private final QueueManagerInternalFactory queuemanagerInternalFactory; - private final Map<String,QueueManager> defaultManager; - private final LoadingCache<QueueScope, QueueManager> queueManager = + private final LegacyQueueFig queueFig; + private final LegacyQueueManagerInternalFactory queuemanagerInternalFactory; + private final Map<String,LegacyQueueManager> defaultManager; + private final LoadingCache<LegacyQueueScope, LegacyQueueManager> queueManager = CacheBuilder .newBuilder() .initialCapacity(5) .maximumSize(100) - .build(new CacheLoader<QueueScope, QueueManager>() { + .build(new CacheLoader<LegacyQueueScope, LegacyQueueManager>() { @Override - public QueueManager load( QueueScope scope ) throws Exception { + public LegacyQueueManager load(LegacyQueueScope scope ) throws Exception { if ( queueFig.overrideQueueForDefault() ){ - QueueManager manager = defaultManager.get( scope.getName() ); + LegacyQueueManager manager = defaultManager.get( scope.getName() ); if ( manager == null ) { manager = new LocalQueueManager(); defaultManager.put( scope.getName(), manager ); @@ -67,14 +67,14 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory { }); @Inject - public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerInternalFactory queuemanagerInternalFactory){ + public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){ this.queueFig = queueFig; this.queuemanagerInternalFactory = queuemanagerInternalFactory; this.defaultManager = new HashMap<>(10); } @Override - public QueueManager getQueueManager(QueueScope scope) { + public LegacyQueueManager getQueueManager(LegacyQueueScope scope) { try { return queueManager.get(scope); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java deleted file mode 100644 index fa50b49..0000000 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. The ASF licenses this file to You - * under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. For additional information regarding - * copyright in this work, please see the NOTICE file in the top level - * directory of this distribution. - */ -package org.apache.usergrid.persistence.queue.impl; - -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.queue.QueueScope; - -public class QueueScopeImpl implements QueueScope { - - private final String name; - private final RegionImplementation regionImpl; - - public QueueScopeImpl( final String name, final RegionImplementation regionImpl) { - this.name = name; - this.regionImpl = regionImpl; - } - - - - - @Override - public String getName() { - return name; - } - - @Override - public RegionImplementation getRegionImplementation() {return regionImpl;} - - @Override - public boolean equals( final Object o ) { - if ( this == o ) { - return true; - } - if ( !( o instanceof QueueScopeImpl ) ) { - return false; - } - - final QueueScopeImpl queueScope = ( QueueScopeImpl ) o; - - if ( !name.equals( queueScope.name ) ) { - return false; - } - - - return true; - } - - - @Override - public int hashCode() { - return name.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index ae11517..6d4e0c4 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -35,11 +35,11 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.CassandraFig; import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory; import org.apache.usergrid.persistence.core.guicyfig.ClusterFig; -import org.apache.usergrid.persistence.queue.Queue; -import org.apache.usergrid.persistence.queue.QueueFig; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.LegacyQueue; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueScope; import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; import com.amazonaws.AmazonServiceException; @@ -80,12 +80,12 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; -public class SNSQueueManagerImpl implements QueueManager { +public class SNSQueueManagerImpl implements LegacyQueueManager { private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class ); - private final QueueScope scope; - private final QueueFig fig; + private final LegacyQueueScope scope; + private final LegacyQueueFig fig; private final ClusterFig clusterFig; private final CassandraFig cassandraFig; private final ClientConfiguration clientConfiguration; @@ -121,16 +121,16 @@ public class SNSQueueManagerImpl implements QueueManager { } } ); - private final LoadingCache<String, Queue> readQueueUrlMap = - CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() { + private final LoadingCache<String, LegacyQueue> readQueueUrlMap = + CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, LegacyQueue>() { @Override - public Queue load( String queueName ) throws Exception { + public LegacyQueue load(String queueName ) throws Exception { - Queue queue = null; + LegacyQueue queue = null; try { GetQueueUrlResult result = sqs.getQueueUrl( queueName ); - queue = new Queue( result.getQueueUrl() ); + queue = new LegacyQueue( result.getQueueUrl() ); } catch ( QueueDoesNotExistException queueDoesNotExistException ) { logger.error( "Queue {} does not exist, will create", queueName ); @@ -142,7 +142,7 @@ public class SNSQueueManagerImpl implements QueueManager { if ( queue == null ) { String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig ); - queue = new Queue( url ); + queue = new LegacyQueue( url ); } setupTopics( queueName ); @@ -153,8 +153,8 @@ public class SNSQueueManagerImpl implements QueueManager { @Inject - public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, - CassandraFig cassandraFig, QueueFig queueFig ) { + public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig, + CassandraFig cassandraFig, LegacyQueueFig queueFig ) { this.scope = scope; this.fig = fig; this.clusterFig = clusterFig; @@ -232,7 +232,7 @@ public class SNSQueueManagerImpl implements QueueManager { "Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e ); } - if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) { + if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) { String multiRegion = fig.getRegionList(); @@ -391,7 +391,7 @@ public class SNSQueueManagerImpl implements QueueManager { } - public Queue getReadQueue() { + public LegacyQueue getReadQueue() { String queueName = getName(); try { @@ -414,7 +414,7 @@ public class SNSQueueManagerImpl implements QueueManager { @Override - public List<QueueMessage> getMessages(final int limit, final Class klass) { + public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) { if ( sqs == null ) { logger.error( "SQS is null - was not initialized properly" ); @@ -457,7 +457,7 @@ public class SNSQueueManagerImpl implements QueueManager { logger.trace( "Received {} messages from {}", messages.size(), url ); } - List<QueueMessage> queueMessages = new ArrayList<>( messages.size() ); + List<LegacyQueueMessage> queueMessages = new ArrayList<>( messages.size() ); for ( Message message : messages ) { @@ -487,7 +487,7 @@ public class SNSQueueManagerImpl implements QueueManager { throw new RuntimeException( e ); } - QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload, + LegacyQueueMessage queueMessage = new LegacyQueueMessage( message.getMessageId(), message.getReceiptHandle(), payload, message.getAttributes().get( "type" ) ); queueMessage.setStringBody( originalBody ); int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount")); @@ -634,7 +634,7 @@ public class SNSQueueManagerImpl implements QueueManager { @Override - public void commitMessage( final QueueMessage queueMessage ) { + public void commitMessage( final LegacyQueueMessage queueMessage ) { String url = getReadQueue().getUrl(); if ( logger.isTraceEnabled() ) { logger.trace( "Commit message {} to queue {}", queueMessage.getMessageId(), url ); @@ -646,7 +646,7 @@ public class SNSQueueManagerImpl implements QueueManager { @Override - public void commitMessages( final List<QueueMessage> queueMessages ) { + public void commitMessages( final List<LegacyQueueMessage> queueMessages ) { String url = getReadQueue().getUrl(); if ( logger.isTraceEnabled() ) { @@ -655,7 +655,7 @@ public class SNSQueueManagerImpl implements QueueManager { List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(); - for ( QueueMessage message : queueMessages ) { + for ( LegacyQueueMessage message : queueMessages ) { entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java index b6b1aaa..56bef91 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java @@ -9,7 +9,7 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.queue.QueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; import com.amazonaws.auth.policy.Condition; import com.amazonaws.auth.policy.Policy; @@ -40,7 +40,7 @@ public class AmazonNotificationUtils { private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class ); - public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig ) + public static String createQueue( final AmazonSQSClient sqs, final String queueName, final LegacyQueueFig fig ) throws Exception { final String deadletterQueueName = String.format( "%s_dead", queueName ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java new file mode 100644 index 0000000..69655e5 --- /dev/null +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.queue; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; +import org.apache.usergrid.persistence.core.test.ITRunner; +import org.apache.usergrid.persistence.core.test.UseModules; +import org.apache.usergrid.persistence.queue.guice.TestQueueModule; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; + +import com.google.inject.Inject; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + + +@RunWith( ITRunner.class ) +@UseModules( { TestQueueModule.class } ) +public class LegacyQueueManagerTest { + + @Inject + protected LegacyQueueFig queueFig; + @Inject + protected LegacyQueueManagerFactory qmf; + + /** + * Mark tests as ignored if no AWS creds are present + */ + @Rule + public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule(); + + + protected LegacyQueueScope scope; + private LegacyQueueManager qm; + + public static long queueSeed = System.currentTimeMillis(); + + + @Before + public void mockApp() { + + this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL); + qm = qmf.getQueueManager(scope); + } + + @org.junit.After + public void cleanup(){ + qm.deleteQueue(); + } + + + @Test + public void send() throws Exception{ + String value = "bodytest"; + qm.sendMessage(value); + List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class); + assertTrue(messageList.size() >= 1); + for(LegacyQueueMessage message : messageList){ + assertTrue(message.getBody().equals(value)); + qm.commitMessage(message); + } + + messageList = qm.getMessages(1, String.class); + assertTrue(messageList.size() <= 0); + + } + + @Test + public void sendMore() throws Exception{ + HashMap<String,String> values = new HashMap<>(); + values.put("test","Test"); + + List<Map<String,String>> bodies = new ArrayList<>(); + bodies.add(values); + qm.sendMessages(bodies); + List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass()); + assertTrue(messageList.size() >= 1); + for(LegacyQueueMessage message : messageList){ + assertTrue(message.getBody().equals(values)); + } + qm.commitMessages(messageList); + + messageList = qm.getMessages(1, values.getClass()); + assertTrue(messageList.size() <= 0); + + } + + @Test + public void queueSize() throws Exception{ + HashMap<String,String> values = new HashMap<>(); + values.put("test", "Test"); + + List<Map<String,String>> bodies = new ArrayList<>(); + bodies.add(values); + long initialDepth = qm.getQueueDepth(); + qm.sendMessages(bodies); + long depth = 0; + for(int i=0; i<10;i++){ + depth = qm.getQueueDepth(); + if(depth>0){ + break; + } + Thread.sleep(1000); + } + assertTrue(depth>0); + + List<LegacyQueueMessage> messageList = qm.getMessages(10, values.getClass()); + assertTrue(messageList.size() <= 500); + for(LegacyQueueMessage message : messageList){ + assertTrue(message.getBody().equals(values)); + } + if(messageList.size()>0) { + qm.commitMessages(messageList); + } + for(int i=0; i<10;i++){ + depth = qm.getQueueDepth(); + if(depth==initialDepth){ + break; + } + Thread.sleep(1000); + } + assertEquals(initialDepth, depth); + } + + + +}