http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java deleted file mode 100644 index d57beab..0000000 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.persistence.queue; - - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; -import org.apache.usergrid.persistence.core.test.ITRunner; -import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.queue.guice.TestQueueModule; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.google.inject.Inject; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; - - -@RunWith( ITRunner.class ) -@UseModules( { TestQueueModule.class } ) -public class QueueManagerTest { - - @Inject - protected QueueFig queueFig; - @Inject - protected QueueManagerFactory qmf; - - /** - * Mark tests as ignored if no AWS creds are present - */ - @Rule - public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule(); - - - protected QueueScope scope; - private QueueManager qm; - - public static long queueSeed = System.currentTimeMillis(); - - - @Before - public void mockApp() { - - this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL); - qm = qmf.getQueueManager(scope); - } - - @org.junit.After - public void cleanup(){ - qm.deleteQueue(); - } - - - @Test - public void send() throws Exception{ - String value = "bodytest"; - qm.sendMessage(value); - List<QueueMessage> messageList = qm.getMessages(1, String.class); - assertTrue(messageList.size() >= 1); - for(QueueMessage message : messageList){ - assertTrue(message.getBody().equals(value)); - qm.commitMessage(message); - } - - messageList = qm.getMessages(1, String.class); - assertTrue(messageList.size() <= 0); - - } - - @Test - public void sendMore() throws Exception{ - HashMap<String,String> values = new HashMap<>(); - values.put("test","Test"); - - List<Map<String,String>> bodies = new ArrayList<>(); - bodies.add(values); - qm.sendMessages(bodies); - List<QueueMessage> messageList = qm.getMessages(1, values.getClass()); - assertTrue(messageList.size() >= 1); - for(QueueMessage message : messageList){ - assertTrue(message.getBody().equals(values)); - } - qm.commitMessages(messageList); - - messageList = qm.getMessages(1, values.getClass()); - assertTrue(messageList.size() <= 0); - - } - - @Test - public void queueSize() throws Exception{ - HashMap<String,String> values = new HashMap<>(); - values.put("test", "Test"); - - List<Map<String,String>> bodies = new ArrayList<>(); - bodies.add(values); - long initialDepth = qm.getQueueDepth(); - qm.sendMessages(bodies); - long depth = 0; - for(int i=0; i<10;i++){ - depth = qm.getQueueDepth(); - if(depth>0){ - break; - } - Thread.sleep(1000); - } - assertTrue(depth>0); - - List<QueueMessage> messageList = qm.getMessages(10, values.getClass()); - assertTrue(messageList.size() <= 500); - for(QueueMessage message : messageList){ - assertTrue(message.getBody().equals(values)); - } - if(messageList.size()>0) { - qm.commitMessages(messageList); - } - for(int i=0; i<10;i++){ - depth = qm.getQueueDepth(); - if(depth==initialDepth){ - break; - } - Thread.sleep(1000); - } - assertEquals(initialDepth, depth); - } - - - -}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java index 3f0ca69..adad92a 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java @@ -22,7 +22,7 @@ package org.apache.usergrid.services.notifications; import org.apache.usergrid.batch.JobExecution; import org.apache.usergrid.persistence.entities.Notification; -import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; import rx.Observable; import java.util.List; @@ -54,7 +54,7 @@ public interface ApplicationQueueManager { * @param queuePath * @return */ - Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath); + Observable sendBatchToProviders(List<LegacyQueueMessage> messages, String queuePath); /** * stop processing and send message to providers to stop http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java index 2ef567d..3b2eff3 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java @@ -20,7 +20,7 @@ import com.google.common.cache.*; import com.google.inject.Singleton; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +103,7 @@ public class ApplicationQueueManagerCache{ public ApplicationQueueManager getApplicationQueueManager( final EntityManager entityManager, - final QueueManager queueManager, + final LegacyQueueManager legacyQueueManager, final JobScheduler jobScheduler, final MetricsFactory metricsService, final Properties properties ) { @@ -124,7 +124,7 @@ public class ApplicationQueueManagerCache{ manager = new ApplicationQueueManagerImpl( jobScheduler, entityManager, - queueManager, + legacyQueueManager, metricsService, properties ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java index 907638e..b43594a 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java @@ -19,7 +19,6 @@ package org.apache.usergrid.services.notifications; import java.util.*; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.services.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +35,10 @@ import org.apache.usergrid.persistence.entities.Receipt; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueScope; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException; import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl; @@ -76,7 +75,7 @@ public class NotificationsService extends AbstractCollectionService { private long gracePeriod; private ServiceManagerFactory smf; private EntityManagerFactory emf; - private QueueManagerFactory queueManagerFactory; + private LegacyQueueManagerFactory queueManagerFactory; private ApplicationQueueManagerCache applicationQueueManagerCache; public NotificationsService() { @@ -97,12 +96,12 @@ public class NotificationsService extends AbstractCollectionService { postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests"); JobScheduler jobScheduler = new JobScheduler(sm,em); String name = ApplicationQueueManagerImpl.getQueueNames( props ); - QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL); - queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class); - QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); + LegacyQueueScope queueScope = new LegacyQueueScopeImpl( name, LegacyQueueScope.RegionImplementation.LOCAL); + queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class); + LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope); applicationQueueManagerCache = getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class); notificationQueueManager = applicationQueueManagerCache - .getApplicationQueueManager(em,queueManager, jobScheduler, metricsService ,props); + .getApplicationQueueManager(em, legacyQueueManager, jobScheduler, metricsService ,props); gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java index 245a36f..0ba5c1e 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java @@ -24,7 +24,6 @@ import javax.annotation.PostConstruct; import com.google.inject.Injector; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.entities.Notification; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java index 478d5ed..20fbd84 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java @@ -18,17 +18,14 @@ package org.apache.usergrid.services.notifications; import com.codahale.metrics.*; import com.codahale.metrics.Timer; -import com.google.common.cache.*; import com.google.inject.Injector; -import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.queue.*; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; -import org.apache.usergrid.services.ServiceManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.apache.usergrid.services.ServiceManagerFactory; import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl; import org.slf4j.Logger; @@ -46,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; */ public class QueueListener { - private final QueueManagerFactory queueManagerFactory; + private final LegacyQueueManagerFactory queueManagerFactory; public static long DEFAULT_SLEEP = 100; @@ -75,7 +72,7 @@ public class QueueListener { private int consecutiveCallsToRemoveDevices; public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){ - this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class); + this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class); this.smf = smf; this.emf = emf; this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class); @@ -161,8 +158,8 @@ public class QueueListener { logger.trace("getting from queue {} ", queueName); } - QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL); - QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); + LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL); + LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope); // run until there are no more active jobs final AtomicLong runCount = new AtomicLong(0); @@ -170,7 +167,7 @@ public class QueueListener { while ( true ) { Timer.Context timerContext = timer.time(); - rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class)) + rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class)) .buffer(MAX_TAKE) .doOnNext(messages -> { @@ -180,10 +177,10 @@ public class QueueListener { } if (messages.size() > 0) { - HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size()); + HashMap<UUID, List<LegacyQueueMessage>> messageMap = new HashMap<>(messages.size()); //group messages into hash map by app id - for (QueueMessage message : messages) { + for (LegacyQueueMessage message : messages) { //TODO: stop copying around this area as it gets notification specific. ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody(); UUID applicationId = queueMessage.getApplicationId(); @@ -191,7 +188,7 @@ public class QueueListener { //Groups queue messages by application Id, ( they are all probably going to the same place ) if (!messageMap.containsKey(applicationId)) { //For each app id it sends the set. - List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>(); + List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>(); applicationQueueMessages.add(message); messageMap.put(applicationId, applicationQueueMessages); } else { @@ -203,13 +200,13 @@ public class QueueListener { Observable merge = null; //send each set of app ids together - for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) { + for (Map.Entry<UUID, List<LegacyQueueMessage>> entry : messageMap.entrySet()) { UUID applicationId = entry.getKey(); ApplicationQueueManager manager = applicationQueueManagerCache .getApplicationQueueManager( emf.getEntityManager(applicationId), - queueManager, + legacyQueueManager, new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)), metricsService, properties @@ -230,7 +227,7 @@ public class QueueListener { if(merge!=null) { merge.toBlocking().lastOrDefault(null); } - queueManager.commitMessages(messages); + legacyQueueManager.commitMessages(messages); meter.mark(messages.size()); if (logger.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java index 23b21f2..96e2dbd 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java @@ -47,8 +47,8 @@ import org.apache.usergrid.persistence.entities.Notifier; import org.apache.usergrid.persistence.entities.Receipt; import org.apache.usergrid.persistence.entities.User; import org.apache.usergrid.persistence.index.utils.UUIDUtils; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; import org.apache.usergrid.services.notifications.ApplicationQueueManager; import org.apache.usergrid.services.notifications.ApplicationQueueMessage; import org.apache.usergrid.services.notifications.JobScheduler; @@ -72,7 +72,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { private static final Logger logger = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class); private final EntityManager em; - private final QueueManager qm; + private final LegacyQueueManager qm; private final JobScheduler jobScheduler; private final MetricsFactory metricsFactory; private final String queueName; @@ -93,11 +93,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { - public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager, - QueueManager queueManager, MetricsFactory metricsFactory, - Properties properties) { + public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, + LegacyQueueManager legacyQueueManager, MetricsFactory metricsFactory, + Properties properties) { this.em = entityManager; - this.qm = queueManager; + this.qm = legacyQueueManager; this.jobScheduler = jobScheduler; this.metricsFactory = metricsFactory; this.queueName = getQueueNames(properties); @@ -473,7 +473,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { * @throws Exception */ @Override - public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) { + public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) { if (logger.isTraceEnabled()) { logger.trace("sending batch of {} notifications.", messages.size()); } @@ -483,7 +483,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager { final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size()); final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size()); - final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage -> { + final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> { boolean messageCommitted = false; ApplicationQueueMessage message = null; try { http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java index a95475d..e2a2808 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java @@ -20,16 +20,14 @@ package org.apache.usergrid.services.queues; import java.util.List; import java.util.Properties; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.apache.usergrid.management.importer.ImportService; -import org.apache.usergrid.management.importer.ImportServiceImpl; import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; import org.apache.usergrid.services.ServiceManagerFactory; import com.google.inject.Inject; @@ -65,7 +63,7 @@ public class ImportQueueListener extends QueueListener { * @param messages */ @Override - public void onMessage( final List<QueueMessage> messages ) throws Exception { + public void onMessage( final List<LegacyQueueMessage> messages ) throws Exception { /** * Much like in the original queueListener , we need to translate the Messages that we get * back from the QueueMessage into something like an Import message. The way that a @@ -76,7 +74,7 @@ public class ImportQueueListener extends QueueListener { if (logger.isTraceEnabled()) { logger.trace("Doing work in onMessage in ImportQueueListener"); } - for (QueueMessage message : messages) { + for (LegacyQueueMessage message : messages) { ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody(); // TODO We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java index f3c65c7..d9db84a 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java @@ -26,17 +26,17 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; /** * Manages the queueManager implementation for Import */ -public class ImportQueueManager implements QueueManager { +public class ImportQueueManager implements LegacyQueueManager { @Override - public List<QueueMessage> getMessages(final int limit, final Class klass) { + public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) { return new ArrayList<>(); } @@ -47,13 +47,13 @@ public class ImportQueueManager implements QueueManager { @Override - public void commitMessage( final QueueMessage queueMessage ) { + public void commitMessage( final LegacyQueueMessage queueMessage ) { } @Override - public void commitMessages( final List<QueueMessage> queueMessages ) { + public void commitMessages( final List<LegacyQueueMessage> queueMessages ) { } http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java index 9d95d87..965e95e 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java @@ -24,8 +24,8 @@ import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.queue.*; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; +import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl; import org.apache.usergrid.services.ServiceManager; import org.apache.usergrid.services.ServiceManagerFactory; import org.slf4j.Logger; @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public abstract class QueueListener { public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000; - private final QueueManagerFactory queueManagerFactory; + private final LegacyQueueManagerFactory queueManagerFactory; public long DEFAULT_SLEEP = 5000; @@ -83,7 +83,7 @@ public abstract class QueueListener { */ public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){ //TODO: change current injectors to use service module instead of CpSetup - this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class ); + this.queueManagerFactory = injector.getInstance( LegacyQueueManagerFactory.class ); this.smf = smf; this.emf = injector.getInstance( EntityManagerFactory.class ); //emf; this.metricsService = injector.getInstance(MetricsFactory.class); @@ -169,8 +169,8 @@ public abstract class QueueListener { if (logger.isTraceEnabled()) { logger.trace("getting from queue {} ", queueName); } - QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL); - QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope); + LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL); + LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope); // run until there are no more active jobs long runCount = 0; @@ -181,7 +181,7 @@ public abstract class QueueListener { Timer.Context timerContext = timer.time(); //Get the messages out of the queue. //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here. - rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class)) + rx.Observable.from( legacyQueueManager.getMessages(getBatchSize(), ImportQueueMessage.class)) .buffer(getBatchSize()) .doOnNext(messages -> { try { @@ -197,7 +197,7 @@ public abstract class QueueListener { // asking for a onMessage call. onMessage(messages); - queueManager.commitMessages(messages); + legacyQueueManager.commitMessages(messages); meter.mark(messages.size()); if (logger.isTraceEnabled()) { @@ -267,7 +267,7 @@ public abstract class QueueListener { * This will be the method that does the job dependant execution. * @param messages */ - public abstract void onMessage(List<QueueMessage> messages) throws Exception; + public abstract void onMessage(List<LegacyQueueMessage> messages) throws Exception; public abstract String getQueueName();
