Initial pass at moving queues to core
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/84d779fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/84d779fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/84d779fc Branch: refs/heads/USERGRID-578 Commit: 84d779fc323b0f6c130c03a8982a39f5d5025b17 Parents: 8bb8a4f Author: Todd Nine <tn...@apigee.com> Authored: Wed Apr 15 14:21:29 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Fri Apr 17 11:58:28 2015 -0600 ---------------------------------------------------------------------- stack/core/pom.xml | 8 + .../usergrid/corepersistence/CoreModule.java | 44 ++- .../corepersistence/CpEntityManager.java | 167 +++++----- .../corepersistence/CpEntityManagerFactory.java | 18 +- .../corepersistence/CpManagerCache.java | 3 + .../corepersistence/CpRelationManager.java | 102 +++---- .../CpRelationManagerFactory.java | 46 --- .../usergrid/corepersistence/GuiceFactory.java | 19 +- .../events/EntityDeletedHandler.java | 5 +- .../events/EntityVersionCreatedHandler.java | 8 +- .../events/EntityVersionDeletedHandler.java | 19 +- .../corepersistence/index/BufferQueue.java | 68 +++++ .../index/BufferQueueInMemoryImpl.java | 116 +++++++ .../index/BufferQueueSQSImpl.java | 306 +++++++++++++++++++ .../index/IndexQueueService.java | 45 +++ .../corepersistence/index/IndexService.java | 50 +++ .../corepersistence/index/IndexServiceImpl.java | 148 +++++++++ .../corepersistence/index/QueryFig.java | 98 ++++++ .../corepersistence/index/QueueProvider.java | 112 +++++++ .../index/SQSIndexQueueServiceImpl.java | 35 +++ .../corepersistence/util/CpNamingUtils.java | 4 +- .../usergrid/persistence/EntityManager.java | 2 - .../persistence/GuiceAdapterBeanFactory.java | 114 +++++++ .../usergrid/persistence/PersistenceModule.java | 162 ++++++++++ .../corepersistence/TestIndexModule.java | 35 +++ .../index/BufferQueueSQSImplTest.java | 179 +++++++++++ .../persistence/core/aws/NoAWSCredsRule.java | 98 ++++++ .../graph/serialization/EdgesObservable.java | 23 ++ .../serialization/impl/EdgesObservableImpl.java | 68 +++-- .../persistence/index/EntityIndexBatch.java | 3 +- .../usergrid/persistence/index/IndexFig.java | 104 ++----- .../persistence/index/guice/IndexModule.java | 5 +- .../persistence/index/guice/QueueProvider.java | 116 ------- .../persistence/index/impl/BufferQueue.java | 66 ---- .../index/impl/BufferQueueInMemoryImpl.java | 115 ------- .../index/impl/BufferQueueSQSImpl.java | 306 ------------------- .../impl/EsApplicationEntityIndexImpl.java | 12 +- .../index/impl/EsEntityIndexBatchImpl.java | 16 +- .../index/impl/EsEntityIndexFactoryImpl.java | 8 +- .../index/impl/EsEntityIndexImpl.java | 50 +-- .../index/impl/EsIndexBufferConsumerImpl.java | 237 ++++++-------- .../index/impl/EsIndexBufferProducerImpl.java | 59 ---- .../index/impl/FailureMonitorImpl.java | 26 -- .../index/impl/FlushBufferQueue.java | 23 ++ .../index/impl/IndexBufferConsumer.java | 15 +- .../index/impl/IndexBufferProducer.java | 32 -- .../persistence/index/impl/IndexIdentifier.java | 46 +++ .../index/impl/IndexIdentifierImpl.java | 118 +------ .../index/impl/IndexOperationMessage.java | 139 +++++++++ .../index/impl/IndexRefreshCommandImpl.java | 12 +- .../index/migration/LegacyIndexIdentifier.java | 4 +- .../index/guice/TestIndexModule.java | 22 +- .../index/impl/BufferQueueSQSImplTest.java | 175 ----------- .../persistence/queue/NoAWSCredsRule.java | 98 ------ .../persistence/queue/QueueManagerTest.java | 3 +- stack/services/pom.xml | 50 +-- .../notifications/NotifiersServiceIT.java | 19 +- 57 files changed, 2232 insertions(+), 1749 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/pom.xml ---------------------------------------------------------------------- diff --git a/stack/core/pom.xml b/stack/core/pom.xml index 6f3c381..3504be4 100644 --- a/stack/core/pom.xml +++ b/stack/core/pom.xml @@ -163,6 +163,14 @@ <dependencies> + + <!-- dependent on wiring guice and spring --> + <dependency> + <groupId>com.google.inject.extensions</groupId> + <artifactId>guice-spring</artifactId> + <version>4.0-beta5</version> + </dependency> + <!-- Apache Dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 7859ffc..7e8af87 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -16,7 +16,13 @@ package org.apache.usergrid.corepersistence; +import org.apache.usergrid.corepersistence.index.BufferQueue; +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.IndexServiceImpl; +import org.apache.usergrid.corepersistence.index.QueryFig; +import org.apache.usergrid.corepersistence.index.QueueProvider; import org.apache.usergrid.corepersistence.migration.*; +import org.apache.usergrid.persistence.PersistenceModule; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.safehaus.guicyfig.GuicyFigModule; import org.springframework.context.ApplicationContext; @@ -56,25 +62,18 @@ import com.google.inject.multibindings.Multibinder; */ public class CoreModule extends AbstractModule { - /** - * TODO this is a circular dependency, and should be refactored - */ - private LazyEntityManagerFactoryProvider lazyEntityManagerFactoryProvider; - public static final String EVENTS_DISABLED = "corepersistence.events.disabled"; + public static final String EVENTS_DISABLED = "corepersistence.events.disabled"; - public CoreModule( final ApplicationContext context ) { - this.lazyEntityManagerFactoryProvider = new LazyEntityManagerFactoryProvider( context ); - } @Override protected void configure() { - //See TODO, this is fugly - bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider ); +// //See TODO, this is fugly +// bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider ); install( new CommonModule()); install( new CollectionModule() { @@ -141,31 +140,24 @@ public class CoreModule extends AbstractModule { plugins.addBinding().to( AppInfoMigrationPlugin.class ); plugins.addBinding().to( MigrationModuleVersionPlugin.class ); - bind(AllApplicationsObservable.class).to(AllApplicationsObservableImpl.class); + bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class ); - install(new GuicyFigModule(ApplicationIdCacheFig.class)); - - } + /***** + * Indexing service + *****/ - /** - * TODO, this is a hack workaround due to the guice/spring EMF circular dependency - * Once the entity managers have been refactored and moved into guice, remove this dependency. - * - */ - public static class LazyEntityManagerFactoryProvider implements Provider<EntityManagerFactory>{ - private final ApplicationContext context; + bind(IndexService.class).to(IndexServiceImpl.class); + //bind the queue provider + bind( BufferQueue.class).toProvider( QueueProvider.class ); - public LazyEntityManagerFactoryProvider( final ApplicationContext context ) {this.context = context;} + install( new GuicyFigModule( QueryFig.class ) ); + install( new GuicyFigModule( ApplicationIdCacheFig.class ) ); - @Override - public EntityManagerFactory get() { - return this.context.getBean( EntityManagerFactory.class ); - } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 2e7b7d8..a615a43 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -80,9 +80,6 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexBatch; -import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.map.MapManager; @@ -125,7 +122,6 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator; import static org.apache.commons.lang.StringUtils.capitalize; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS; import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS; @@ -172,7 +168,6 @@ public class CpEntityManager implements EntityManager { private UUID applicationId; private Application application; - private CpEntityManagerFactory emf; private ManagerCache managerCache; @@ -212,29 +207,34 @@ public class CpEntityManager implements EntityManager { // private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache; - public CpEntityManager() { - - } - - @Override - public void init( EntityManagerFactory emf, UUID applicationId ) { + /** + * Fugly, make this part of DI + * @param cass + * @param counterUtils + * @param managerCache + * @param metricsFactory + * @param applicationId + */ + public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { - Preconditions.checkNotNull( emf, "emf must not be null" ); + Preconditions.checkNotNull( cass, "cass must not be null" ); + Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); + Preconditions.checkNotNull( managerCache, "managerCache must not be null" ); Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); - this.emf = ( CpEntityManagerFactory ) emf; - this.managerCache = this.emf.getManagerCache(); + + this.managerCache = managerCache; this.applicationId = applicationId; applicationScope = CpNamingUtils.getApplicationScope( applicationId ); ecm = managerCache.getEntityCollectionManager( applicationScope ); - this.cass = this.emf.getCassandraService(); - this.counterUtils = this.emf.getCounterUtils(); + this.cass = cass; + this.counterUtils = counterUtils; //Timer Setup - this.metricsFactory = this.emf.getMetricsFactory(); + this.metricsFactory = metricsFactory; this.aggCounterTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.get.aggregate.counters.timer" ); this.entCreateTimer =this.metricsFactory.getTimer( CpEntityManager.class, "cp.entity.create.timer" ); @@ -724,18 +724,18 @@ public class CpEntityManager implements EntityManager { @Override public void updateApplication( Map<String, Object> properties ) throws Exception { - this.updateProperties(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), properties); + this.updateProperties( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), properties ); this.application = get( applicationId, Application.class ); } @Override public RelationManager getRelationManager( EntityRef entityRef ) { - Preconditions.checkNotNull(entityRef, "entityRef cannot be null"); - CpRelationManager rmi = CpRelationManagerFactory.get( - this, emf, applicationId, entityRef, null, metricsFactory - ); - return rmi; + Preconditions.checkNotNull( entityRef, "entityRef cannot be null" ); + + CpRelationManager relationManager = + new CpRelationManager( metricsFactory, managerCache, this, applicationId, entityRef ); + return relationManager; } @@ -819,7 +819,7 @@ public class CpEntityManager implements EntityManager { @Override public EntityRef getAlias( String aliasType, String alias ) throws Exception { - return getAlias(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId), aliasType, alias); + return getAlias( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), aliasType, alias ); } @@ -848,7 +848,7 @@ public class CpEntityManager implements EntityManager { new Object[] { ownerRef, collectionType, aliasValue } ); } - return results.get(aliasValue); + return results.get( aliasValue ); } @@ -1226,7 +1226,7 @@ public class CpEntityManager implements EntityManager { ApplicationCF dictionaryCf = null; - boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary(entity.getType(), dictionaryName); + boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName ); if ( entityHasDictionary ) { dictionaryCf = ENTITY_DICTIONARIES; @@ -1279,7 +1279,7 @@ public class CpEntityManager implements EntityManager { Class<?> dictionaryCoType = Schema.getDefaultSchema().getDictionaryValueType(entity.getType(), dictionaryName); - boolean coTypeIsBasic = ClassUtils.isBasicType(dictionaryCoType); + boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType ); ByteBuffer[] columnNames = new ByteBuffer[elementNames.length]; for ( int i = 0; i < elementNames.length; i++ ) { @@ -1349,14 +1349,14 @@ public class CpEntityManager implements EntityManager { @Override public boolean isCollectionMember( EntityRef owner, String collectionName, EntityRef entity ) throws Exception { - return getRelationManager( owner ).isCollectionMember(collectionName, entity); + return getRelationManager( owner ).isCollectionMember( collectionName, entity ); } @Override public boolean isConnectionMember( EntityRef owner, String connectionName, EntityRef entity ) throws Exception { - return getRelationManager( owner ).isConnectionMember(connectionName, entity); + return getRelationManager( owner ).isConnectionMember( connectionName, entity ); } @@ -1455,7 +1455,7 @@ public class CpEntityManager implements EntityManager { public ConnectionRef createConnection( EntityRef connectingEntity, String connectionType, EntityRef connectedEntityRef ) throws Exception { - return getRelationManager( connectingEntity ).createConnection(connectionType, connectedEntityRef); + return getRelationManager( connectingEntity ).createConnection( connectionType, connectedEntityRef ); } @@ -1465,7 +1465,7 @@ public class CpEntityManager implements EntityManager { throws Exception { return getRelationManager( connectingEntity ) - .createConnection(pairedConnectionType, pairedEntity, connectionType, connectedEntityRef); + .createConnection( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef ); } @@ -1523,7 +1523,7 @@ public class CpEntityManager implements EntityManager { String connectedEntityType, Level resultsLevel ) throws Exception { return getRelationManager( entityRef ) - .getConnectedEntities(connectionType, connectedEntityType, resultsLevel); + .getConnectedEntities( connectionType, connectedEntityType, resultsLevel ); } @@ -1532,7 +1532,7 @@ public class CpEntityManager implements EntityManager { String connectedEntityType, Level resultsLevel ) throws Exception { return getRelationManager( entityRef ) - .getConnectingEntities(connectionType, connectedEntityType, resultsLevel); + .getConnectingEntities( connectionType, connectedEntityType, resultsLevel ); } @@ -1540,21 +1540,21 @@ public class CpEntityManager implements EntityManager { public Results getConnectingEntities( EntityRef entityRef, String connectionType, String entityType, Level level, int count ) throws Exception { - return getRelationManager( entityRef ).getConnectingEntities(connectionType, entityType, level, count); + return getRelationManager( entityRef ).getConnectingEntities( connectionType, entityType, level, count ); } @Override public Results searchConnectedEntities( EntityRef connectingEntity, Query query ) throws Exception { - return getRelationManager( connectingEntity ).searchConnectedEntities(query); + return getRelationManager( connectingEntity ).searchConnectedEntities( query ); } @Override public Set<String> getConnectionIndexes( EntityRef entity, String connectionType ) throws Exception { - return getRelationManager( entity ).getConnectionIndexes(connectionType); + return getRelationManager( entity ).getConnectionIndexes( connectionType ); } @@ -1799,8 +1799,8 @@ public class CpEntityManager implements EntityManager { permission = permission.toLowerCase(); long timestamp = cass.createTimestamp(); Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be ); - CassandraPersistenceUtils.addInsertToMutator(batch, ApplicationCF.ENTITY_DICTIONARIES, - getRolePermissionsKey(groupId, roleName), permission, ByteBuffer.allocate(0), timestamp); + CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, + getRolePermissionsKey( groupId, roleName ), permission, ByteBuffer.allocate( 0 ), timestamp ); //Adding graphite metrics Timer.Context timeGroupRolePermission = entGrantGroupPermissionTimer.time(); @@ -1816,7 +1816,7 @@ public class CpEntityManager implements EntityManager { long timestamp = cass.createTimestamp(); Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be ); CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, - getRolePermissionsKey( groupId, roleName ), permission, timestamp ); + getRolePermissionsKey( groupId, roleName ), permission, timestamp ); //Adding graphite metrics Timer.Context timeRevokeGroupRolePermission = entRevokeGroupPermissionTimer.time(); CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT ); @@ -2281,7 +2281,7 @@ public class CpEntityManager implements EntityManager { private MapManager getMapManagerForTypes() { Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION ); - final MapScope ms = CpNamingUtils.getEntityTypeMapScope(mapOwner); + final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner ); MapManager mm = managerCache.getMapManager(ms); @@ -2836,26 +2836,25 @@ public class CpEntityManager implements EntityManager { CpWalker walker = new CpWalker( ); - walker.walkCollections( - this, getApplication(), collectionName, reverse, new CpVisitor() { + walker.walkCollections( this, getApplication(), collectionName, reverse, new CpVisitor() { - @Override - public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) { + @Override + public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) { - try { - em.update( entity ); - po.onProgress( entity ); - } - catch ( WriteOptimisticVerifyException wo ) { - // swallow this, it just means this was already updated, which accomplishes our task - logger.warn( "Someone beat us to updating entity {} in collection {}. Ignoring.", - entity.getName(), collName ); - } - catch ( Exception ex ) { - logger.error( "Error repersisting entity", ex ); + try { + em.update( entity ); + po.onProgress( entity ); + } + catch ( WriteOptimisticVerifyException wo ) { + // swallow this, it just means this was already updated, which accomplishes our task + logger.warn( "Someone beat us to updating entity {} in collection {}. Ignoring.", + entity.getName(), collName ); + } + catch ( Exception ex ) { + logger.error( "Error repersisting entity", ex ); + } } - } - } ); + } ); } @@ -2890,32 +2889,34 @@ public class CpEntityManager implements EntityManager { void indexEntityIntoCollection( final Edge edge, final org.apache.usergrid.persistence.model.entity.Entity collectionMember) { - final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope() ); - final EntityIndexBatch batch = aie.createBatch(); - - // index member into entity collection | type scope - final IndexEdge collectionIndexScope = generateScopeFromSource( edge ); - batch.index( collectionIndexScope, collectionMember ); - - //TODO REMOVE INDEX CODE - // // index member into entity | all-types scope - // IndexScope entityAllTypesScope = new IndexScopeImpl( - // collectionEntity.getId(), - // CpNamingUtils.ALL_TYPES, entityType ); - // - // batch.index(entityAllTypesScope, memberEntity); - // - // // index member into application | all-types scope - // IndexScope appAllTypesScope = new IndexScopeImpl( - // getApplicationScope().getApplication(), - // CpNamingUtils.ALL_TYPES, entityType ); - // - // batch.index(appAllTypesScope, memberEntity); - - //Adding graphite metrics - Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time(); - batch.execute(); - timeIndexEntityCollection.stop(); + throw new UnsupportedOperationException( "Use the new interface!" ); +// +// final ApplicationEntityIndex aie = getManagerCache().getEntityIndex( getApplicationScope() ); +// final EntityIndexBatch batch = aie.createBatch(); +// +// // index member into entity collection | type scope +// final IndexEdge collectionIndexScope = generateScopeFromSource( edge ); +// batch.index( collectionIndexScope, collectionMember ); +// +// //TODO REMOVE INDEX CODE +// // // index member into entity | all-types scope +// // IndexScope entityAllTypesScope = new IndexScopeImpl( +// // collectionEntity.getId(), +// // CpNamingUtils.ALL_TYPES, entityType ); +// // +// // batch.index(entityAllTypesScope, memberEntity); +// // +// // // index member into application | all-types scope +// // IndexScope appAllTypesScope = new IndexScopeImpl( +// // getApplicationScope().getApplication(), +// // CpNamingUtils.ALL_TYPES, entityType ); +// // +// // batch.index(appAllTypesScope, memberEntity); +// +// //Adding graphite metrics +// Timer.Context timeIndexEntityCollection = esIndexEntityCollectionTimer.time(); +// batch.execute(); +// timeIndexEntityCollection.stop(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 9907f91..e03ed47 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -156,14 +156,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } - public ManagerCache getManagerCache() { - - if ( managerCache == null ) { - managerCache = injector.getInstance( ManagerCache.class ); - } - return managerCache; - } - private Observable<EntityIdScope> getAllEntitiesObservable(){ return injector.getInstance( Key.get(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){})).getData(); } @@ -184,16 +176,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(); - em.init( this ,applicationId ); - + EntityManager em = new CpEntityManager(cassandraService, counterUtils, managerCache, metricsFactory, applicationId ); return em; } - public MetricsFactory getMetricsFactory(){ - return metricsFactory; - } - @Override public Entity createApplicationV2(String organizationName, String name) throws Exception { return createApplicationV2( organizationName, name, null ); @@ -746,7 +732,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application public Health getEntityStoreHealth() { // could use any collection scope here, does not matter - EntityCollectionManager ecm = getManagerCache().getEntityCollectionManager( + EntityCollectionManager ecm = managerCache.getEntityCollectionManager( new ApplicationScopeImpl( new SimpleId( CpNamingUtils.MANAGEMENT_APPLICATION_ID, "application" ) ) ); return ecm.getHealth(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java index 4cae31e..f4fee0c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java @@ -30,6 +30,9 @@ import org.apache.usergrid.persistence.map.MapScope; import com.google.inject.Inject; +/** + * Cache for managing our other managers. Now just a delegate. Needs refactored away + */ public class CpManagerCache implements ManagerCache { private final EntityCollectionManagerFactory ecmf; http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 1ae1db1..3c72b60 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -107,9 +107,6 @@ public class CpRelationManager implements RelationManager { private static final Logger logger = LoggerFactory.getLogger( CpRelationManager.class ); - - private CpEntityManagerFactory emf; - private ManagerCache managerCache; private EntityManager em; @@ -126,25 +123,18 @@ public class CpRelationManager implements RelationManager { private Timer updateCollectionTimer; - public CpRelationManager() {} - - - public CpRelationManager init( EntityManager em, CpEntityManagerFactory emf, UUID applicationId, - EntityRef headEntity, IndexBucketLocator indexBucketLocator, - MetricsFactory metricsFactory ) { + public CpRelationManager(final MetricsFactory metricsFactory, final ManagerCache managerCache, final EntityManager em, final UUID applicationId, final EntityRef headEntity ) { Assert.notNull( em, "Entity manager cannot be null" ); - Assert.notNull( emf, "Entity manager factory cannot be null" ); Assert.notNull( applicationId, "Application Id cannot be null" ); Assert.notNull( headEntity, "Head entity cannot be null" ); Assert.notNull( headEntity.getUuid(), "Head entity uuid cannot be null" ); // TODO: this assert should not be failing //Assert.notNull( indexBucketLocator, "indexBucketLocator cannot be null" ); this.em = em; - this.emf = emf; this.applicationId = applicationId; this.headEntity = headEntity; - this.managerCache = emf.getManagerCache(); + this.managerCache = managerCache; this.applicationScope = CpNamingUtils.getApplicationScope( applicationId ); this.metricsFactory = metricsFactory; @@ -165,7 +155,7 @@ public class CpRelationManager implements RelationManager { Assert.notNull( cpHeadEntity, String .format( "cpHeadEntity cannot be null for entity id %s, app id %s", entityId.getUuid(), applicationId ) ); - return this; + } @@ -260,40 +250,42 @@ public class CpRelationManager implements RelationManager { final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) { - final GraphManager gm = managerCache.getGraphManager( applicationScope ); - - // loop through all types of edge to target - - - final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); - - final EntityIndexBatch entityIndexBatch = ei.createBatch(); - - final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ) - - // for each edge type, emit all the edges of that type - .flatMap( etype -> gm.loadEdgesToTarget( - new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, null ) ) ) - - //for each edge we receive index and add to the batch - .doOnNext( edge -> { - // reindex the entity in the source entity's collection or connection index - - IndexEdge indexScope = generateScopeFromSource( edge ); - - entityIndexBatch.index( indexScope, cpEntity ); - - } ).doOnCompleted( () -> { - Timer.Context timeElasticIndexBatch = updateCollectionTimer.time(); - entityIndexBatch.execute(); - timeElasticIndexBatch.stop(); - } ).count().toBlocking().lastOrDefault( 0 ); - - //Adding graphite metrics - - - logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count ); + throw new UnsupportedOperationException( "Use the new interface" ); + +// final GraphManager gm = managerCache.getGraphManager( applicationScope ); +// +// // loop through all types of edge to target +// +// +// final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); +// +// final EntityIndexBatch entityIndexBatch = ei.createBatch(); +// +// final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ) +// +// // for each edge type, emit all the edges of that type +// .flatMap( etype -> gm.loadEdgesToTarget( +// new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, +// SearchByEdgeType.Order.DESCENDING, null ) ) ) +// +// //for each edge we receive index and add to the batch +// .doOnNext( edge -> { +// // reindex the entity in the source entity's collection or connection index +// +// IndexEdge indexScope = generateScopeFromSource( edge ); +// +// entityIndexBatch.index( indexScope, cpEntity ); +// +// } ).doOnCompleted( () -> { +// Timer.Context timeElasticIndexBatch = updateCollectionTimer.time(); +// entityIndexBatch.execute(); +// timeElasticIndexBatch.stop(); +// } ).count().toBlocking().lastOrDefault( 0 ); +// +// //Adding graphite metrics +// +// +// logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count ); } @@ -494,9 +486,10 @@ public class CpRelationManager implements RelationManager { // headEntityScope.getName()}); if ( connectBack && collection != null && collection.getLinkedCollection() != null ) { - getRelationManager( itemEntity ) - .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false ); - getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false ); + throw new UnsupportedOperationException( "Implement me directly in graph " ); +// getRelationManager( itemEntity ) +// .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false ); +// getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false ); } return itemEntity; @@ -555,7 +548,8 @@ public class CpRelationManager implements RelationManager { addToCollection( collName, itemEntity ); if ( collection != null && collection.getLinkedCollection() != null ) { - getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity ); + throw new UnsupportedOperationException( "Implement me directly in graph " ); +// getRelationManager( getHeadEntity() ).addToCollection( collection.getLinkedCollection(), itemEntity ); } } @@ -1045,12 +1039,6 @@ public class CpRelationManager implements RelationManager { } - private CpRelationManager getRelationManager( EntityRef headEntity ) { - CpRelationManager rmi = new CpRelationManager(); - rmi.init( em, emf, applicationId, headEntity, null, metricsFactory ); - return rmi; - } - /** side effect: converts headEntity into an Entity if it is an EntityRef! */ private Entity getHeadEntity() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java deleted file mode 100644 index 4223f37..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManagerFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. The ASF licenses this file to You - * * under the Apache License, Version 2.0 (the "License"); you may not - * * use this file except in compliance with the License. - * * You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. For additional information regarding - * * copyright in this work, please see the NOTICE file in the top level - * * directory of this distribution. - * - */ -package org.apache.usergrid.corepersistence; - -import com.google.inject.Inject; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.IndexBucketLocator; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; - -import java.util.UUID; - -/** - * Factory to return and init relation manager instances - */ -public class CpRelationManagerFactory { - - - public static CpRelationManager get( EntityManager em, - CpEntityManagerFactory emf, - UUID applicationId, - EntityRef headEntity, - IndexBucketLocator indexBucketLocator, - MetricsFactory metricsFactory){ - CpRelationManager relationManager = new CpRelationManager(); - relationManager.init(em,emf,applicationId,headEntity,indexBucketLocator,metricsFactory); - return relationManager; - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java index 566430f..3a08034 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceFactory.java @@ -31,8 +31,7 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; -import org.apache.usergrid.persistence.cassandra.CassandraService; -import org.apache.usergrid.persistence.core.guice.CommonModule; +import org.apache.usergrid.persistence.PersistenceModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -47,7 +46,7 @@ import me.prettyprint.cassandra.service.CassandraHostConfigurator; * Factory for configuring Guice then returning it */ @Singleton -public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAware { +public class GuiceFactory implements FactoryBean<Injector> { private static final Logger logger = LoggerFactory.getLogger( GuiceFactory.class ); @@ -55,16 +54,16 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw private final Properties systemProperties; - private ApplicationContext applicationContext; private Injector injector; - public GuiceFactory( final CassandraHostConfigurator chc, final Properties systemProperties ) { + public GuiceFactory( final ApplicationContext applicationContext, final CassandraHostConfigurator chc, final Properties systemProperties ) { this.chc = chc; this.systemProperties = systemProperties; + this.applicationContext = applicationContext; } @@ -128,8 +127,10 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw throw new RuntimeException( "Fatal error loading configuration.", e ); } - //this is seriously fugly, and needs removed - injector = Guice.createInjector( new CoreModule( applicationContext ) ); + + + //this is seriously fugly, and needs removed we shouldn't be mixing spring and guice + injector = Guice.createInjector( new CoreModule( ), new PersistenceModule( applicationContext ) ); return injector; } @@ -147,8 +148,4 @@ public class GuiceFactory implements FactoryBean<Injector>, ApplicationContextAw } - @Override - public void setApplicationContext( final ApplicationContext applicationContext ) throws BeansException { - this.applicationContext = applicationContext; - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java index 6a37144..2de69ed 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java @@ -42,11 +42,8 @@ public class EntityDeletedHandler implements EntityDeleted { private static final Logger logger = LoggerFactory.getLogger( EntityDeletedHandler.class ); - private final EntityManagerFactory emf; - - @Inject - public EntityDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;} + public EntityDeletedHandler( ) {} @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java index 0163fc2..20bdd55 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java @@ -41,15 +41,11 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated { private static final Logger logger = LoggerFactory.getLogger(EntityVersionCreatedHandler.class ); - private final EntityManagerFactory emf; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; @Inject - public EntityVersionCreatedHandler( final EntityManagerFactory emf, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.emf = emf; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; + public EntityVersionCreatedHandler( ) { + } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java index 700851a..22f599e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java @@ -30,9 +30,11 @@ import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.model.entity.Id; @@ -56,17 +58,20 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { private static final Logger logger = LoggerFactory.getLogger( EntityVersionDeletedHandler.class ); - private final EntityManagerFactory emf; private final EdgesObservable edgesObservable; private final SerializationFig serializationFig; + private final EntityIndexFactory entityIndexFactory; + private final GraphManagerFactory graphManagerFactory; @Inject - public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable, - final SerializationFig serializationFig ) { - this.emf = emf; + public EntityVersionDeletedHandler( final EdgesObservable edgesObservable, final SerializationFig serializationFig, + final EntityIndexFactory entityIndexFactory, + final GraphManagerFactory graphManagerFactory ) { this.edgesObservable = edgesObservable; this.serializationFig = serializationFig; + this.entityIndexFactory = entityIndexFactory; + this.graphManagerFactory = graphManagerFactory; } @@ -87,10 +92,8 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { } ); } - CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; - - final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); - final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope ); + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( scope ); + final GraphManager gm = graphManagerFactory.createEdgeManager( scope ); //create an observable of all scopes to deIndex http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java new file mode 100644 index 0000000..fc1bdb7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueue.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; + + +/** + * A temporary interface of our buffer Q to decouple of producer and consumer; + */ +public interface BufferQueue { + + /** + * Offer the indexoperation message. Some queues may support not returning the future until ack or fail. + * Other queues may return the future after ack on the offer. See the implementation documentation for details. + * @param operation + */ + public void offer(final IndexOperationMessage operation); + + + /** + * Perform a take, potentially blocking until up to takesize is available, or timeout has elapsed. + * May return less than the take size, but will never return null + * + * @param takeSize + * @param timeout + * @param timeUnit + * @return A null safe lid + */ + public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit ); + + + /** + * Ack all messages so they do not appear again. Meant for transactional queues, and may or may not be implemented. + * This will set the future as done in in memory operations + * + * @param messages + */ + public void ack(final List<IndexOperationMessage> messages); + + /** + * Mark these message as failed. Set the exception in the future on local operation + * + * @param messages + */ + public void fail(final List<IndexOperationMessage> messages, final Throwable t); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java new file mode 100644 index 0000000..0e43da3 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueInMemoryImpl.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.usergrid.persistence.core.future.BetterFuture; +import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +@Singleton +public class BufferQueueInMemoryImpl implements BufferQueue { + + + private final QueryFig fig; + private final ArrayBlockingQueue<IndexOperationMessage> messages; + + + @Inject + public BufferQueueInMemoryImpl( final QueryFig fig ) { + this.fig = fig; + messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() ); + } + + + @Override + public void offer( final IndexOperationMessage operation ) { + try { + messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS ); + } + catch ( InterruptedException e ) { + throw new RuntimeException("Unable to offer message to queue", e); + } + } + + + @Override + public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { + + final List<IndexOperationMessage> response = new ArrayList<>( takeSize ); + try { + + + messages.drainTo( response, takeSize ); + + //we got something, go process it + if ( response.size() > 0 ) { + return response; + } + + + final IndexOperationMessage polled = messages.poll( timeout, timeUnit ); + + if ( polled != null ) { + response.add( polled ); + + //try to add more + messages.drainTo( response, takeSize - 1 ); + } + } + catch ( InterruptedException e ) { + //swallow + } + + + return response; + } + + + @Override + public void ack( final List<IndexOperationMessage> messages ) { + //if we have a future ack it + for ( final IndexOperationMessage op : messages ) { + op.done(); + } + } + + + @Override + public void fail( final List<IndexOperationMessage> messages, final Throwable t ) { + + + for ( final IndexOperationMessage op : messages ) { + final BetterFuture<IndexOperationMessage> future = op.getFuture(); + + if ( future != null ) { + future.setError( t ); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java new file mode 100644 index 0000000..d955014 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/BufferQueueSQSImpl.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.map.MapManager; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.map.MapScope; +import org.apache.usergrid.persistence.map.impl.MapScopeImpl; +import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +/** + * This is experimental at best. Our SQS size limit is a problem. We shouldn't use this for index operation. Only for + * performing + */ +@Singleton +public class BufferQueueSQSImpl implements BufferQueue { + + private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class ); + + /** Hacky, copied from CPEntityManager b/c we can't access it here */ + public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); + + + /** + * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed + */ + public static final int TTL = 60 * 60 * 24 * 30; + + /** + * The name to put in the map + */ + public static final String MAP_NAME = "esqueuedata"; + + + private static final String QUEUE_NAME = "es_queue"; + + private static SmileFactory SMILE_FACTORY = new SmileFactory(); + + + static { + SMILE_FACTORY.delegateToTextual( true ); + } + + + private final QueueManager queue; + private final MapManager mapManager; + private final QueryFig queryFig; + private final ObjectMapper mapper; + private final Meter readMeter; + private final Timer readTimer; + private final Meter writeMeter; + private final Timer writeTimer; + + + @Inject + public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, + final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { + final QueueScope queueScope = + new QueueScopeImpl( QUEUE_NAME ); + + this.queue = queueManagerFactory.getQueueManager( queueScope ); + this.queryFig = queryFig; + + final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME ); + + this.mapManager = mapManagerFactory.createMapManager( scope ); + + + this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" ); + this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" ); + + this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" ); + this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" ); + + this.mapper = new ObjectMapper( SMILE_FACTORY ); + //pretty print, disabling for speed + // mapper.enable(SerializationFeature.INDENT_OUTPUT); + + } + + + @Override + public void offer( final IndexOperationMessage operation ) { + + //no op + if(operation.isEmpty()){ + operation.getFuture().done(); + return; + } + + final Timer.Context timer = this.writeTimer.time(); + this.writeMeter.mark(); + + final UUID identifier = UUIDGenerator.newTimeUUID(); + + try { + + final String payLoad = toString( operation ); + + //write to cassandra + this.mapManager.putString( identifier.toString(), payLoad, TTL ); + + //signal to SQS + this.queue.sendMessage( identifier ); + operation.done(); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + + @Override + public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { + + //SQS doesn't support more than 10 + + final int actualTake = Math.min( 10, takeSize ); + + final Timer.Context timer = this.readTimer.time(); + + try { + + List<QueueMessage> messages = queue + .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), + String.class ); + + + + final List<IndexOperationMessage> response = new ArrayList<>( messages.size() ); + + final List<String> mapEntries = new ArrayList<>( messages.size() ); + + + if(messages.size() == 0){ + return response; + } + + //add all our keys for a single round trip + for ( final QueueMessage message : messages ) { + mapEntries.add( message.getBody().toString() ); + } + + //look up the values + final Map<String, String> storedCommands = mapManager.getStrings( mapEntries ); + + + //load them into our response + for ( final QueueMessage message : messages ) { + + final String key = getMessageKey( message ); + + //now see if the key was there + final String payload = storedCommands.get( key ); + + //the entry was not present in cassandra, ignore this message. Failure should eventually kick it to + // a DLQ + + if ( payload == null ) { + continue; + } + + final IndexOperationMessage messageBody; + + try { + messageBody = fromString( payload ); + } + catch ( IOException e ) { + logger.error( "Unable to deserialize message from string. This is a bug", e ); + throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); + } + + SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); + + response.add( operation ); + } + + readMeter.mark( response.size() ); + return response; + } + //stop our timer + finally { + timer.stop(); + } + } + + + @Override + public void ack( final List<IndexOperationMessage> messages ) { + + //nothing to do + if ( messages.size() == 0 ) { + return; + } + + List<QueueMessage> toAck = new ArrayList<>( messages.size() ); + + for ( IndexOperationMessage ioe : messages ) { + + + final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; + + final String key = getMessageKey( sqsIndexOperationMessage.getMessage() ); + + //remove it from the map + mapManager.delete( key ); + + toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); + } + + queue.commitMessages( toAck ); + } + + + @Override + public void fail( final List<IndexOperationMessage> messages, final Throwable t ) { + //no op, just let it retry after the queue timeout + } + + + /** Read the object from Base64 string. */ + private IndexOperationMessage fromString( String s ) throws IOException { + IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class ); + return o; + } + + + /** Write the object to a Base64 string. */ + private String toString( IndexOperationMessage o ) throws IOException { + return mapper.writeValueAsString( o ); + } + + private String getMessageKey(final QueueMessage message){ + return message.getBody().toString(); + } + + /** + * The message that subclasses our IndexOperationMessage. holds a pointer to the original message + */ + public class SqsIndexOperationMessage extends IndexOperationMessage { + + private final QueueMessage message; + + + public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) { + this.message = message; + this.addAllDeIndexRequest( source.getDeIndexRequests() ); + this.addAllIndexRequest( source.getIndexRequests() ); + } + + + /** + * Get the message from our queue + */ + public QueueMessage getMessage() { + return message; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java new file mode 100644 index 0000000..8d7f222 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexQueueService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Low level queue service for indexing entities + */ +public interface IndexQueueService { + + + /** + * Queue an entity to be index asynchronously + * @param applicationScope + * @param entityId + * @param version + */ + void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java new file mode 100644 index 0000000..2bf073c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import rx.Observable; + + +/** + * Our low level indexing service operations + */ +public interface IndexService { + + + /** + * Perform an index update of the entity's state from Cassandra + * + * @param applicationScope The scope of the entity + * @param entity The entity + * + * @return An observable with the count of every + */ + Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity ); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java new file mode 100644 index 0000000..2a7533a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.entities.Application; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; +import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.IndexEdge; +import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.persistence.schema.CollectionInfo; +import org.apache.usergrid.utils.InflectionUtils; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget; +import static org.apache.usergrid.persistence.Schema.getDefaultSchema; + + +/** + * Implementation of the indexing service + */ +@Singleton +public class IndexServiceImpl implements IndexService { + + private final GraphManagerFactory graphManagerFactory; + private final EntityIndexFactory entityIndexFactory; + private final EdgesObservable edgesObservable; + private final IndexFig indexFig; + + + @Inject + public IndexServiceImpl( final GraphManagerFactory graphManagerFactory, final EntityIndexFactory entityIndexFactory, + final EdgesObservable edgesObservable, IndexFig indexFig ) { + this.graphManagerFactory = graphManagerFactory; + this.entityIndexFactory = entityIndexFactory; + this.edgesObservable = edgesObservable; + this.indexFig = indexFig; + } + + + @Override + public Observable<Integer> indexEntity( final ApplicationScope applicationScope, final Entity entity ) { + + + //bootstrap the lower modules from their caches + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + + final Id entityId = entity.getId(); + + + //we always index in the target scope + final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId ); + + //we may have to index + final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeToTarget( edge ) ); + + + //we might or might not need to index from target-> source + + + final Observable<IndexEdge> targetSizes = getIndexEdgesToTarget( gm, entityId ); + + + final Observable<IndexOperationMessage> observable = + //try to send a whole batch if we can + Observable.merge( sourceEdgesToIndex, targetSizes ).buffer( indexFig.getIndexBatchSize() ) + + //map into batches based on our buffer size + .flatMap( buffer -> Observable.from( buffer ).collect( () -> ei.createBatch(), + ( batch, indexEdge ) -> batch.index( indexEdge, entity ) ) + //return the future from the batch execution + .flatMap( batch -> Observable.from( batch.execute() ) ) ); + + observable.toBlocking().last(); + + + return Observable.just( 0 ); + } + + + /** + * Get index edgs to the target + * + * @param graphManager The graph manager + * @param entityId The entitie's id + */ + private Observable<IndexEdge> getIndexEdgesToTarget( final GraphManager graphManager, final Id entityId ) { + + final String collectionName = InflectionUtils.pluralize( entityId.getType() ); + + + final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName ); + + //nothing to do + if ( collection == null ) { + return Observable.empty(); + } + + + final String linkedCollection = collection.getLinkedCollection(); + + /** + * Nothing to link + */ + if ( linkedCollection == null ) { + return Observable.empty(); + } + + + /** + * An observable of sizes as we execute batches + */ + return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ) + .map( edge -> generateScopeFromSource( edge ) ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java new file mode 100644 index 0000000..a7d2450 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueryFig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.index; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * Application id cache fig + */ +public interface QueryFig extends GuicyFig { + + + /** + * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple + * backpressure + */ + public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait"; + + /** + * The number of worker threads to consume from the queue + */ + public static final String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count"; + + /** + * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> + */ + public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl"; + + + /** + * The queue implementation to use. Values come from <class>QueueProvider.Implementations</class> + */ + public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout"; + + /** + * Amount of time to wait when reading from the queue + */ + public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout"; + + /** + * Amount of time to wait when reading from the queue in milliseconds + */ + public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout"; + + + String INDEX_QUEUE_SIZE = "elasticsearch.queue_size"; + + + @Default( "1000" ) + @Key( FAILURE_REJECTED_RETRY_WAIT_TIME ) + long getFailureRetryTime(); + + //give us 60 seconds to process the message + @Default( "60" ) + @Key( INDEX_QUEUE_READ_TIMEOUT ) + int getIndexQueueTimeout(); + + @Default( "2" ) + @Key( ELASTICSEARCH_WORKER_COUNT ) + int getWorkerCount(); + + @Default( "LOCAL" ) + @Key( ELASTICSEARCH_QUEUE_IMPL ) + String getQueueImplementation(); + + @Default( "1000" ) + @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT ) + long getQueueOfferTimeout(); + + /** + * size of the buffer to build up before you send results + */ + @Default( "1000" ) + @Key( INDEX_QUEUE_SIZE ) + int getIndexQueueSize(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java new file mode 100644 index 0000000..d3920db --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/QueueProvider.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; + +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; + + +/** + * A provider to allow users to configure their queue impl via properties + */ +@Singleton +public class QueueProvider implements Provider<BufferQueue> { + + private final QueryFig queryFig; + + private final QueueManagerFactory queueManagerFactory; + private final MapManagerFactory mapManagerFactory; + private final MetricsFactory metricsFactory; + + private BufferQueue bufferQueue; + + + @Inject + public QueueProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, + final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) { + this.queryFig = queryFig; + + + this.queueManagerFactory = queueManagerFactory; + this.mapManagerFactory = mapManagerFactory; + this.metricsFactory = metricsFactory; + } + + + @Override + @Singleton + public BufferQueue get() { + if ( bufferQueue == null ) { + bufferQueue = getQueue(); + } + + + return bufferQueue; + } + + + private BufferQueue getQueue() { + final String value = queryFig.getQueueImplementation(); + + final Implementations impl = Implementations.valueOf( value ); + + switch ( impl ) { + case LOCAL: + return new BufferQueueInMemoryImpl( queryFig ); + case SQS: + return new BufferQueueSQSImpl( queueManagerFactory, queryFig, mapManagerFactory, metricsFactory ); + default: + throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); + } + } + + + private String getErrorValues() { + String values = ""; + + for ( final Implementations impl : Implementations.values() ) { + values += impl + ", "; + } + + values = values.substring( 0, values.length() - 2 ); + + return values; + } + + + /** + * Different implementations + */ + public static enum Implementations { + LOCAL, + SQS; + + + public String asString() { + return toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java new file mode 100644 index 0000000..42f36b1 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSIndexQueueServiceImpl.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.util.UUID; + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Id; + + +public class SQSIndexQueueServiceImpl implements IndexQueueService { + + @Override + public void queueEntityIndex( final ApplicationScope applicationScope, final Id entityId, final UUID version ) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 0376780..c60b86e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -157,7 +157,7 @@ public class CpNamingUtils { */ public static SearchEdge createCollectionSearchEdge( final Id sourceId, final String connectionType ) { return new SearchEdgeImpl( sourceId, getEdgeTypeFromCollectionName( connectionType ), - SearchEdge.NodeType.SOURCE ); + SearchEdge.NodeType.TARGET ); } @@ -181,7 +181,7 @@ public class CpNamingUtils { */ public static SearchEdge createConnectionSearchEdge( final Id sourceId, final String connectionType ) { return new SearchEdgeImpl( sourceId, getEdgeTypeFromConnectionType( connectionType ), - SearchEdge.NodeType.SOURCE ); + SearchEdge.NodeType.TARGET ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/84d779fc/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index 7929b12..fc8b3d5 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -689,8 +689,6 @@ public interface EntityManager { /** @return the cass */ CassandraService getCass(); - public void init( EntityManagerFactory emf, UUID applicationId); - /** For testing purposes */ public void flushManagerCaches();