Added factory for RX I/O Scheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9b9a1b02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9b9a1b02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9b9a1b02 Branch: refs/heads/two-dot-o-dev Commit: 9b9a1b02f877ee6f35d7c3a7ba74001a17c237f1 Parents: 48d8060 Author: Todd Nine <[email protected]> Authored: Mon Apr 20 08:01:12 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Apr 20 10:36:06 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 5 +- .../corepersistence/CpEntityManager.java | 9 +- .../corepersistence/CpRelationManager.java | 57 +------- .../index/AsyncIndexProvider.java | 15 +-- .../index/AsyncIndexService.java | 5 +- .../index/InMemoryAsyncIndexService.java | 34 ++--- .../corepersistence/index/IndexServiceImpl.java | 9 +- .../index/SQSAsyncIndexService.java | 4 +- .../EntityCollectionManagerFactoryImpl.java | 15 ++- .../impl/EntityCollectionManagerImpl.java | 38 +++--- .../mvcc/stage/write/RollbackAction.java | 2 - .../persistence/core/guice/CommonModule.java | 12 ++ .../persistence/core/rx/RxSchedulerFig.java | 60 +++++++++ .../persistence/core/rx/RxTaskScheduler.java | 40 ++++++ .../core/rx/RxTaskSchedulerImpl.java | 129 +++++++++++++++++++ .../index/impl/IndexRefreshCommandImpl.java | 14 +- .../persistence/index/impl/IndexingUtils.java | 6 +- .../persistence/index/usergrid-mappings.json | 4 +- 18 files changed, 314 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 4758456..6ebff53 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -34,6 +34,9 @@ import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugi import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl; import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl; +import org.apache.usergrid.persistence.core.rx.RxSchedulerFig; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl; import org.apache.usergrid.persistence.collection.event.EntityDeleted; import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; @@ -144,7 +147,7 @@ public class CoreModule extends AbstractModule { *****/ - bind(IndexService.class).to(IndexServiceImpl.class); + bind(IndexService.class).to( IndexServiceImpl.class ); //bind the queue provider bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 844892a..6c3989d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -620,9 +620,8 @@ public class CpEntityManager implements EntityManager { } // update in all containing collections and connection indexes - CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity ); - rm.updateContainingCollectionAndCollectionIndexes( cpEntity ); - timer.stop(); + + indexService.queueEntityIndexUpdate( applicationScope, cpEntity ); } @@ -1067,9 +1066,7 @@ public class CpEntityManager implements EntityManager { //Adding graphite metrics - // update in all containing collections and connection indexes - CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef ); - rm.updateContainingCollectionAndCollectionIndexes( cpEntity ); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index db9816e..be8605e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -255,47 +255,6 @@ public class CpRelationManager implements RelationManager { } - public void updateContainingCollectionAndCollectionIndexes( - final org.apache.usergrid.persistence.model.entity.Entity cpEntity ) { - - - throw new UnsupportedOperationException( "Use the new interface" ); - -// final GraphManager gm = managerCache.getGraphManager( applicationScope ); -// -// // loop through all types of edge to target -// -// -// final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); -// -// final EntityIndexBatch entityIndexBatch = ei.createBatch(); -// -// final int count = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), null, null ) ) -// -// // for each edge type, emit all the edges of that type -// .flatMap( etype -> gm.loadEdgesToTarget( -// new SimpleSearchByEdgeType( cpHeadEntity.getId(), etype, Long.MAX_VALUE, -// SearchByEdgeType.Order.DESCENDING, null ) ) ) -// -// //for each edge we receive index and add to the batch -// .doOnNext( edge -> { -// // reindex the entity in the source entity's collection or connection index -// -// IndexEdge indexScope = generateScopeFromSource( edge ); -// -// entityIndexBatch.index( indexScope, cpEntity ); -// -// } ).doOnCompleted( () -> { -// Timer.Context timeElasticIndexBatch = updateCollectionTimer.time(); -// entityIndexBatch.execute(); -// timeElasticIndexBatch.stop(); -// } ).count().toBlocking().lastOrDefault( 0 ); -// -// //Adding graphite metrics -// -// -// logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count ); - } @Override @@ -473,8 +432,6 @@ public class CpRelationManager implements RelationManager { gm.writeEdge( edge ).toBlocking().last(); - //This is broken and needs fixed updateContainingCollectionAndCollectionIndexes See USERGRID-541 - //perform indexing @@ -482,7 +439,7 @@ public class CpRelationManager implements RelationManager { logger.debug( "Wrote edge {}", edge ); } - indexService.queueEntityIndexUpdate( applicationScope, memberEntity.getId(), memberEntity.getVersion() ); + indexService.queueEntityIndexUpdate( applicationScope, memberEntity); if ( logger.isDebugEnabled() ) { @@ -490,17 +447,7 @@ public class CpRelationManager implements RelationManager { itemRef.getUuid().toString(), itemRef.getType(), collName } ); } - // logger.debug("With head entity scope is {}:{}:{}", new Object[] { - // headEntityScope.getApplication().toString(), - // headEntityScope.getOwner().toString(), - // headEntityScope.getName()}); - - if ( connectBack && collection != null && collection.getLinkedCollection() != null ) { - throw new UnsupportedOperationException( "Implement me directly in graph " ); -// getRelationManager( itemEntity ) -// .addToCollection( collection.getLinkedCollection(), headEntity, cpHeadEntity, false ); -// getRelationManager( itemEntity ).addToCollection( collection.getLinkedCollection(), headEntity, false ); - } + return itemEntity; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index 8257c94..77b4990 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -20,7 +20,7 @@ package org.apache.usergrid.corepersistence.index; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory; @@ -37,24 +37,23 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { private final QueryFig queryFig; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final QueueManagerFactory queueManagerFactory; private final MetricsFactory metricsFactory; private final IndexService indexService; + private final RxTaskScheduler rxTaskScheduler; private AsyncIndexService asyncIndexService; @Inject - public AsyncIndexProvider( final QueryFig queryFig, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final QueueManagerFactory queueManagerFactory, final MetricsFactory metricsFactory, - final IndexService indexService ) { + public AsyncIndexProvider( final QueryFig queryFig, final QueueManagerFactory queueManagerFactory, final + MetricsFactory metricsFactory, + final IndexService indexService, final RxTaskScheduler rxTaskScheduler ) { this.queryFig = queryFig; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.queueManagerFactory = queueManagerFactory; this.metricsFactory = metricsFactory; this.indexService = indexService; + this.rxTaskScheduler = rxTaskScheduler; } @@ -77,7 +76,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { switch ( impl ) { case LOCAL: - return new InMemoryAsyncIndexService( indexService, entityCollectionManagerFactory ); + return new InMemoryAsyncIndexService( indexService, rxTaskScheduler ); case SQS: return new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); default: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java index d1f2fb6..06310ae 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java @@ -40,8 +40,7 @@ public interface AsyncIndexService { * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. * @param applicationScope - * @param entityId - * @param version + * @param entity The entity to index */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, final UUID version ); + void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java index fdc9c65..3e2a271 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java @@ -20,56 +20,40 @@ package org.apache.usergrid.corepersistence.index; -import java.util.UUID; - -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; -import rx.schedulers.Schedulers; @Singleton public class InMemoryAsyncIndexService implements AsyncIndexService { private final IndexService indexService; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; + private final RxTaskScheduler rxTaskScheduler; @Inject - public InMemoryAsyncIndexService( final IndexService indexService, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) {this.indexService = indexService; + public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler ) { + this.indexService = indexService; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; + this.rxTaskScheduler = rxTaskScheduler; } @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, - final UUID version ) { - - final IndexEntityEvent event = new IndexEntityEvent( applicationScope, entityId, version ); + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity toIndex ) { //process the entity immediately //only process the same version, otherwise ignore - getEntity( applicationScope, entityId).filter( entity-> version.equals(entity.hasVersion() )).doOnNext( entity -> { - indexService.indexEntity( applicationScope, entity ); - } ).subscribeOn( Schedulers.io() ).subscribe(); - - - } - - private Observable<Entity> getEntity( final ApplicationScope applicationScope, final Id entityId){ - - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - return ecm.load( entityId ); + Observable.just( toIndex ).doOnNext( entity -> { + indexService.indexEntity( applicationScope, entity ); + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 2a7533a..873e2b6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -83,8 +83,8 @@ public class IndexServiceImpl implements IndexService { //we always index in the target scope final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId ); - //we may have to index - final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeToTarget( edge ) ); + //we may have to index we're indexing from source->target here + final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) ); //we might or might not need to index from target-> source @@ -141,8 +141,9 @@ public class IndexServiceImpl implements IndexService { /** * An observable of sizes as we execute batches + * + * we're indexing from target->source here */ - return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ) - .map( edge -> generateScopeFromSource( edge ) ); + return edgesObservable.getEdgesFromSource( graphManager, entityId, linkedCollection ).map( edge -> generateScopeToTarget( edge ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java index cad20bd..dfcb97a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.persistence.queue.QueueManager; @@ -256,8 +257,7 @@ public class SQSAsyncIndexService implements AsyncIndexService { @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Id entityId, - final UUID version ) { + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java index 9191c06..761c4b5 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java @@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerial import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy; import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.TaskExecutor; @@ -74,6 +75,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag private final TaskExecutor taskExecutor; private final EntityCacheFig entityCacheFig; private final MetricsFactory metricsFactory; + private final RxTaskScheduler rxTaskScheduler; private LoadingCache<ApplicationScope, EntityCollectionManager> ecmCache = CacheBuilder.newBuilder().maximumSize( 1000 ) @@ -84,7 +86,8 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag writeStart, writeVerifyUnique, writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, entitySerializationStrategy, uniqueValueSerializationStrategy, - mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory ); + mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory, + rxTaskScheduler ); final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target ); @@ -95,8 +98,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag @Inject - public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, - final WriteUniqueVerify writeVerifyUnique, + public EntityCollectionManagerFactoryImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit writeCommit, final RollbackAction rollback, final MarkStart markStart, final MarkCommit markCommit, @@ -105,9 +107,9 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, - final EntityCacheFig entityCacheFig, - MetricsFactory metricsFactory) { + @CollectionTaskExecutor final TaskExecutor taskExecutor, final + EntityCacheFig entityCacheFig, + MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) { this.writeStart = writeStart; this.writeVerifyUnique = writeVerifyUnique; @@ -124,6 +126,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag this.taskExecutor = taskExecutor; this.entityCacheFig = entityCacheFig; this.metricsFactory = metricsFactory; + this.rxTaskScheduler = rxTaskScheduler; } @Override public EntityCollectionManager createCollectionManager(ApplicationScope applicationScope) { http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java index f0b070c..6f10e86 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java @@ -49,6 +49,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet; import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.task.Task; import org.apache.usergrid.persistence.core.task.TaskExecutor; @@ -109,6 +110,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { private final EntityVersionTaskFactory entityVersionTaskFactory; private final TaskExecutor taskExecutor; + private final RxTaskScheduler rxTaskScheduler; + private final Keyspace keyspace; private final Timer writeTimer; private final Meter writeMeter; @@ -125,26 +128,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { @Inject - public EntityCollectionManagerImpl( - final WriteStart writeStart, - final WriteUniqueVerify writeVerifyUnique, - final WriteOptimisticVerify writeOptimisticVerify, - final WriteCommit writeCommit, - final RollbackAction rollback, - final MarkStart markStart, - final MarkCommit markCommit, - final MvccEntitySerializationStrategy entitySerializationStrategy, - final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, - final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, - final Keyspace keyspace, - final EntityVersionTaskFactory entityVersionTaskFactory, - @CollectionTaskExecutor final TaskExecutor taskExecutor, - @Assisted final ApplicationScope applicationScope, - final MetricsFactory metricsFactory - - ) { + public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique, + final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit + writeCommit, + final RollbackAction rollback, final MarkStart markStart, + final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy, + final UniqueValueSerializationStrategy uniqueValueSerializationStrategy, + final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy, + final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory, + @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope, + final MetricsFactory metricsFactory, + + final RxTaskScheduler rxTaskScheduler ) { this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy; this.entitySerializationStrategy = entitySerializationStrategy; + this.rxTaskScheduler = rxTaskScheduler; ValidationUtils.validateApplicationScope( applicationScope ); @@ -453,13 +451,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager { public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) { Observable<CollectionIoEvent<MvccEntity>> unique = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) .doOnNext( writeVerifyUnique ); // optimistic verification Observable<CollectionIoEvent<MvccEntity>> optimistic = - Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() ) + Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ) .doOnNext( writeOptimisticVerify ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java index cd15c26..8342e55 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/RollbackAction.java @@ -49,7 +49,6 @@ public class RollbackAction implements Action1<Throwable> { private static final Logger log = LoggerFactory.getLogger( RollbackAction.class ); - private final Scheduler scheduler; private final UniqueValueSerializationStrategy uniqueValueStrat; private final MvccLogEntrySerializationStrategy logEntryStrat; @@ -58,7 +57,6 @@ public class RollbackAction implements Action1<Throwable> { public RollbackAction(MvccLogEntrySerializationStrategy logEntryStrat, UniqueValueSerializationStrategy uniqueValueStrat ) { - scheduler = Schedulers.io(); this.uniqueValueStrat = uniqueValueStrat; this.logEntryStrat = logEntryStrat; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java index bc84b6b..f266643 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java @@ -36,6 +36,9 @@ import org.apache.usergrid.persistence.core.migration.schema.Migration; import org.apache.usergrid.persistence.core.migration.schema.MigrationManager; import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig; import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl; +import org.apache.usergrid.persistence.core.rx.RxSchedulerFig; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl; import com.google.inject.AbstractModule; import com.google.inject.Key; @@ -86,6 +89,15 @@ public class CommonModule extends AbstractModule { //do multibindings for migrations //create the empty multibinder so other plugins can use it Multibinder.newSetBinder( binder(), MigrationPlugin.class); + + + /** + * RX java scheduler configuration + */ + + install ( new GuicyFigModule( RxSchedulerFig.class )); + + bind( RxTaskScheduler.class).to( RxTaskSchedulerImpl.class ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java new file mode 100644 index 0000000..7986132 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.rx; + + +import org.safehaus.guicyfig.Default; +import org.safehaus.guicyfig.GuicyFig; +import org.safehaus.guicyfig.Key; + + +/** + * + */ +public interface RxSchedulerFig extends GuicyFig { + + + /** + * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple + * backpressure + */ + public static final String IO_SCHEDULER_THREADS = "scheduler.io.threads"; + + + /** + * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple + * backpressure + */ + public static final String IO_SCHEDULER_NAME = "scheduler.io.poolName"; + + + + + @Default( "100" ) + @Key( IO_SCHEDULER_THREADS ) + int getMaxIoThreads(); + + @Default( "Usergrid-RxIOPool" ) + @Key(IO_SCHEDULER_NAME) + String getIoSchedulerName(); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java new file mode 100644 index 0000000..d6cc5e8 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.rx; + + +import org.apache.usergrid.persistence.core.task.Task; + +import rx.Scheduler; + + +/** + * An interface for returning task schedulers + */ +public interface RxTaskScheduler { + + /** + * Get the scheduler for tasks that perform blocking I/O + * @return + */ + Scheduler getAsyncIOScheduler(); + + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java new file mode 100644 index 0000000..219cde6 --- /dev/null +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.persistence.core.rx; + + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Scheduler; +import rx.schedulers.Schedulers; + + +/** + * An implementation of the task scheduler that allows us to control the number of I/O threads + */ +@Singleton +public class RxTaskSchedulerImpl implements RxTaskScheduler { + + private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class ); + + private final Scheduler scheduler; + private final String poolName; + + @Inject + public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){ + + this.poolName = schedulerFig.getIoSchedulerName(); + + final int poolSize = schedulerFig.getMaxIoThreads(); + + + final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize); + + + final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize ); + + + this.scheduler = Schedulers.from(threadPool); + + + } + + + @Override + public Scheduler getAsyncIOScheduler() { + return scheduler; + } + + + /** + * Create a thread pool that will reject work if our audit tasks become overwhelmed + */ + private final class MaxSizeThreadPool extends ThreadPoolExecutor { + + public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) { + + super( 1, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), + new RejectedHandler() ); + } + } + + + /** + * Thread factory that will name and count threads for easier debugging + */ + private final class CountingThreadFactory implements ThreadFactory { + + private final AtomicLong threadCounter = new AtomicLong(); + + + @Override + public Thread newThread( final Runnable r ) { + final long newValue = threadCounter.incrementAndGet(); + + Thread t = new Thread( r, poolName + "-" + newValue ); + + t.setDaemon( true ); + + return t; + } + } + + + /** + * The handler that will handle rejected executions and signal the interface + */ + private final class RejectedHandler implements RejectedExecutionHandler { + + + @Override + public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) { + log.warn( "{} task queue full, rejecting task {}", poolName, r ); + + //TODO T.N. do we want to run this on the caller thread? + + throw new RejectedExecutionException( "Unable to run task, queue full" ); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java index 07150bb..c997e5a 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.index.AliasedEntityIndex; @@ -48,7 +49,6 @@ import com.codahale.metrics.Timer; import com.google.inject.Inject; import rx.Observable; -import rx.schedulers.Schedulers; import rx.util.async.Async; @@ -64,12 +64,13 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { private final IndexBufferConsumer producer; private final IndexFig indexFig; private final Timer timer; - + private final RxTaskScheduler rxTaskScheduler; @Inject public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider, IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory, - final IndexCache indexCache ) { + final IndexCache indexCache, final RxTaskScheduler rxTaskScheduler ) { + this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh.timer" ); @@ -78,6 +79,7 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { this.producer = producer; this.indexFig = indexFig; this.indexCache = indexCache; + this.rxTaskScheduler = rxTaskScheduler; } @@ -141,14 +143,14 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand { logger.error( "Failed during refresh search for " + uuid, ee ); throw new RuntimeException( "Failed during refresh search for " + uuid, ee ); } - }, Schedulers.io() ).call(); + }, rxTaskScheduler.getAsyncIOScheduler() ).call(); return future.doOnNext( found -> { if ( !found.hasFinished() ) { - logger.error(String.format("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime())); + logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); }else{ - logger.info(String.format("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime())); + logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime()); } } ).doOnCompleted(() -> { //clean up our data http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java index 7c33084..de5e29d 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java @@ -54,11 +54,11 @@ public class IndexingUtils { public static final String ENTITY_TYPE_FIELDNAME = "entityType"; - public static final String EDGE_NODE_ID_FIELDNAME = "edgeNodeId"; + public static final String EDGE_NODE_ID_FIELDNAME = "nodeId"; public static final String EDGE_NAME_FIELDNAME = "edgeName"; - public static final String EDGE_NODE_TYPE_FIELDNAME = "edgeType"; + public static final String EDGE_NODE_TYPE_FIELDNAME = "entityNodeType"; public static final String EDGE_TIMESTAMP_FIELDNAME = "edgeTimestamp"; @@ -107,8 +107,6 @@ public class IndexingUtils { idString( sb, scope.getNodeId() ); sb.append( FIELD_SEPERATOR ); sb.append( scope.getEdgeName() ); - sb.append( FIELD_SEPERATOR ); - sb.append( scope.getNodeType() ); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9b9a1b02/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json index 4da1902..d4c34cd 100644 --- a/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json +++ b/stack/corepersistence/queryindex/src/main/resources/org/apache/usergrid/persistence/index/usergrid-mappings.json @@ -21,7 +21,7 @@ "index": "not_analyzed", "doc_values": true }, - "edgeNodeId": { + "nodeId": { "type": "string", "index": "not_analyzed", "doc_values": true @@ -31,7 +31,7 @@ "index": "not_analyzed", "doc_values": true }, - "edgeType": { + "entityNodeType": { "type": "string", "index": "not_analyzed", "doc_values": true
