Repository: usergrid Updated Branches: refs/heads/master e504aae65 -> fdd4e0be3
Make it possible to configure either LOCAL, DISTRIBUTED or DISTRIBUTED_SNS queue; also: set Core tests to use DISTRIBUTED queue (i.e. Qakka). Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/278abd29 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/278abd29 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/278abd29 Branch: refs/heads/master Commit: 278abd29473938c0b8169e9ddf8d1e6f2ce0539b Parents: e504aae Author: Dave Johnson <[email protected]> Authored: Fri Nov 18 17:13:34 2016 -0500 Committer: Dave Johnson <[email protected]> Committed: Fri Nov 18 17:13:34 2016 -0500 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 9 +- .../corepersistence/CpEntityManager.java | 3 +- .../usergrid/corepersistence/GuiceFactory.java | 2 +- .../asyncevents/AsyncIndexProvider.java | 103 ++++--------------- .../corepersistence/TestCoreModule.java | 6 +- .../corepersistence/TestIndexModule.java | 10 +- .../resources/usergrid-custom-test.properties | 1 + .../persistence/index/guice/IndexModule.java | 11 +- .../index/guice/TestIndexModule.java | 8 +- .../persistence/queue/LegacyQueueManager.java | 9 ++ .../persistence/queue/guice/QueueModule.java | 48 +++++++-- .../usergrid/persistence/queue/TestModule.java | 2 +- 12 files changed, 114 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 781eede..ef4bb04 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -44,6 +44,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphN import org.apache.usergrid.persistence.index.guice.IndexModule; import org.safehaus.guicyfig.GuicyFigModule; +import java.util.Properties; import java.util.concurrent.ThreadPoolExecutor; @@ -52,6 +53,11 @@ import java.util.concurrent.ThreadPoolExecutor; */ public class CoreModule extends AbstractModule { + private final Properties properties; + + public CoreModule( Properties properties ) { + this.properties = properties; + } @Override protected void configure() { @@ -79,7 +85,8 @@ public class CoreModule extends AbstractModule { bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to( AllNodesInGraphImpl.class ); } } ); - install( new IndexModule() { + + install( new IndexModule( properties ) { @Override public void configureMigrationProvider() { bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ) http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 87a8649..c6a757e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -3106,6 +3106,7 @@ public class CpEntityManager implements EntityManager { Entity refreshEntity = create("refresh", map); EntityIndex.IndexRefreshCommandInfo indexRefreshCommandInfo = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); + try { for (int i = 0; i < 20; i++) { if (searchCollection( @@ -3118,7 +3119,7 @@ public class CpEntityManager implements EntityManager { hasFinished = true; break; } - Thread.sleep(100); + Thread.sleep(500); indexRefreshCommandInfo = managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java index f2be27a..1758801 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java @@ -148,7 +148,7 @@ public class GuiceFactory implements FactoryBean<Injector> { Module serviceModule =(Module)applicationContext.getBean("serviceModule"); moduleList.add( serviceModule); } - moduleList.add(new CoreModule()); + moduleList.add(new CoreModule( systemProperties )); moduleList.add(new PersistenceModule(applicationContext)); //we have to inject a couple of spring beans into our Guice. Wire it with PersistenceModule injector = Guice.createInjector( moduleList ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 561dfdc..81960f5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -27,7 +27,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.impl.IndexProducer; -import org.apache.usergrid.persistence.queue.LocalQueueManager; +import org.apache.usergrid.persistence.queue.LegacyQueueManager; import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.queue.LegacyQueueFig; import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; @@ -36,6 +36,8 @@ import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; +import static org.apache.usergrid.persistence.queue.LegacyQueueManager.Implementation.LOCAL; + /** * A provider to allow users to configure their queue impl via properties @@ -100,88 +102,25 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { private AsyncEventService getIndexService() { final String value = indexProcessorFig.getQueueImplementation(); - final Implementations impl = Implementations.valueOf(value); - - switch (impl) { - - case LOCAL: - AsyncEventServiceImpl eventService = - new AsyncEventServiceImpl(scope -> new LocalQueueManager(), - indexProcessorFig, - indexProducer, - metricsFactory, - entityCollectionManagerFactory, - indexLocationStrategyFactory, - entityIndexFactory, - eventBuilder, - mapManagerFactory, - queueFig,rxTaskScheduler); - eventService.MAX_TAKE = 1000; - return eventService; - - case SQS: - throw new IllegalArgumentException( - "Configuration value of SQS is no longer allowed. Use SNS instead with only a single region."); - - case SNS: - return new AsyncEventServiceImpl( - queueManagerFactory, - indexProcessorFig, - indexProducer, - metricsFactory, - entityCollectionManagerFactory, - indexLocationStrategyFactory, - entityIndexFactory, - eventBuilder, - mapManagerFactory, - queueFig, - rxTaskScheduler ); - - case MULTIREGION: - return new AsyncEventServiceImpl( - queueManagerFactory, - indexProcessorFig, - indexProducer, - metricsFactory, - entityCollectionManagerFactory, - indexLocationStrategyFactory, - entityIndexFactory, - eventBuilder, - mapManagerFactory, - queueFig, - rxTaskScheduler ); - - default: - throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); + final LegacyQueueManager.Implementation impl = LegacyQueueManager.Implementation.valueOf(value); + + final AsyncEventServiceImpl asyncEventService = new AsyncEventServiceImpl( + queueManagerFactory, + indexProcessorFig, + indexProducer, + metricsFactory, + entityCollectionManagerFactory, + indexLocationStrategyFactory, + entityIndexFactory, + eventBuilder, + mapManagerFactory, + queueFig, + rxTaskScheduler ); + + if ( impl.equals( LOCAL )) { + asyncEventService.MAX_TAKE = 1000; } - } - - - private String getErrorValues() { - String values = ""; - - for (final Implementations impl : Implementations.values()) { - values += impl + ", "; - } - - values = values.substring(0, values.length() - 2); - return values; - } - - - /** - * Different implementations - */ - public static enum Implementations { - TEST, - LOCAL, - SQS, // deprecated - SNS, // deprecated - MULTIREGION; // built-in Akka-based queue - - public String asString() { - return toString(); - } + return asyncEventService; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java index 32b9845..8cab054 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestCoreModule.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence; import org.apache.usergrid.persistence.core.guice.TestModule; +import java.util.Properties; + /** * Test guice module for our core guice configuration @@ -29,6 +31,8 @@ public class TestCoreModule extends TestModule { @Override protected void configure() { - install( new CoreModule() ); + Properties properties = new Properties(); + properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" ); + install( new CoreModule( properties ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java index 95000bf..ecd16ef 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/TestIndexModule.java @@ -25,6 +25,8 @@ import org.apache.usergrid.persistence.PersistenceModule; import org.apache.usergrid.persistence.core.guice.TestModule; import org.apache.usergrid.setup.ConcurrentProcessSingleton; +import java.util.Properties; + public class TestIndexModule extends TestModule { @@ -32,12 +34,14 @@ public class TestIndexModule extends TestModule { @Override protected void configure() { - //TODO, refactor to guice to get rid of this - final ApplicationContext singleton = ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext(); + final ApplicationContext singleton = + ConcurrentProcessSingleton.getInstance().getSpringResource().getAppContext(); //this will break, we need to untagle this and move to guice in core completely - install( new CoreModule() ); + Properties properties = new Properties(); + properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" ); + install( new CoreModule( properties ) ); install( new PersistenceModule( singleton ) ); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/core/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties index 55d881f..4070d4a 100644 --- a/stack/core/src/test/resources/usergrid-custom-test.properties +++ b/stack/core/src/test/resources/usergrid-custom-test.properties @@ -28,6 +28,7 @@ elasticsearch.management_number_replicas=0 elasticsearch.managment_index=usergrid_core_management #cassandra.keyspace.application=core_tests_schema elasticsearch.queue_impl.resolution=true +elasticsearch.queue_impl=LOCAL elasticsearch.buffer_timeout=1 http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java index 47399c7..86d5d36 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java @@ -35,9 +35,18 @@ import org.apache.usergrid.persistence.queue.guice.QueueModule; import org.safehaus.guicyfig.GuicyFigModule; +import java.util.Properties; + public abstract class IndexModule extends AbstractModule { + private final Properties properties; + + public IndexModule( Properties properties ) { + this.properties = properties; + } + + @Override protected void configure() { @@ -45,7 +54,7 @@ public abstract class IndexModule extends AbstractModule { install(new GuicyFigModule(IndexFig.class)); install(new MapModule()); - install(new QueueModule()); + install(new QueueModule( properties.getProperty( "elasticsearch.queue_impl" ) )); bind( EntityIndexFactory.class ).to( EsEntityIndexFactoryImpl.class ); bind(IndexCache.class).to(EsIndexCacheImpl.class); http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java index 3bc6193..62b2bd2 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java @@ -32,6 +32,8 @@ import com.google.inject.TypeLiteral; import rx.Observable; +import java.util.Properties; + public class TestIndexModule extends TestModule { @@ -42,7 +44,11 @@ public class TestIndexModule extends TestModule { install( new CommonModule()); // configure collections and our core astyanax framework - install( new IndexModule(){ + + Properties properties = new Properties(); + properties.setProperty( "elasticsearch.queue_impl", "DISTRIBUTED" ); + + install( new IndexModule( properties ) { @Override public void configureMigrationProvider(){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java index 6627148..afe229d 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java @@ -27,6 +27,15 @@ import java.util.List; public interface LegacyQueueManager { /** + * Different implementations + */ + enum Implementation { + LOCAL, // local in-memory queue + DISTRIBUTED, // built-in Akka-based queue + DISTRIBUTED_SNS; // SNS queue + } + + /** * Read messages from queue * @param limit * @param klass class to cast the return from http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java index f0e0900..e426247 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java @@ -21,20 +21,35 @@ package org.apache.usergrid.persistence.queue.guice; import com.google.inject.AbstractModule; import com.google.inject.assistedinject.FactoryModuleBuilder; import org.apache.usergrid.persistence.qakka.QakkaModule; -import org.apache.usergrid.persistence.queue.LegacyQueueFig; -import org.apache.usergrid.persistence.queue.LegacyQueueManager; -import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; -import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory; +import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.impl.QakkaQueueManager; import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl; +import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl; import org.safehaus.guicyfig.GuicyFigModule; +import java.util.Properties; + /** * Simple module for wiring our collection api */ public class QueueModule extends AbstractModule { + private LegacyQueueManager.Implementation implementation; + + public QueueModule( String queueManagerType ) { + + if ( "LOCAL".equals( queueManagerType ) ) { + this.implementation = LegacyQueueManager.Implementation.LOCAL; + } + else if ( "DISTRIBUTED_SNS".equals( queueManagerType ) ) { + this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED_SNS; + } + else if ( "DISTRIBUTED".equals( queueManagerType ) ) { + this.implementation = LegacyQueueManager.Implementation.DISTRIBUTED; + } + } + @Override protected void configure() { @@ -43,8 +58,29 @@ public class QueueModule extends AbstractModule { bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class); - install( new FactoryModuleBuilder().implement(LegacyQueueManager.class, QakkaQueueManager.class) - .build(LegacyQueueManagerInternalFactory.class)); + switch (implementation) { + + case LOCAL: + + install( new FactoryModuleBuilder().implement( LegacyQueueManager.class, LocalQueueManager.class ) + .build( LegacyQueueManagerInternalFactory.class ) ); + break; + + case DISTRIBUTED_SNS: + install( new FactoryModuleBuilder().implement( LegacyQueueManager.class, SNSQueueManagerImpl.class ) + .build( LegacyQueueManagerInternalFactory.class ) ); + break; + + case DISTRIBUTED: + install( new FactoryModuleBuilder().implement( LegacyQueueManager.class, QakkaQueueManager.class ) + .build( LegacyQueueManagerInternalFactory.class ) ); + break; + + default: + throw new IllegalArgumentException( + "Queue implemetation value of " + implementation + " not allowed"); + + } install( new QakkaModule() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/278abd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java index 9d2ed24..78be24f 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/TestModule.java @@ -35,7 +35,7 @@ public class TestModule extends AbstractModule { install( new CommonModule() ); install( new ActorSystemModule() ); - install( new QueueModule() ); + install( new QueueModule( "DISTRIBUTED" ) ); }
