Start of rewire of the rebuild process
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3480a363 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3480a363 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3480a363 Branch: refs/heads/USERGRID-593 Commit: 3480a3637cc3cc6c79504642c3c44fbcf5878efd Parents: ee1a66f Author: Todd Nine <tn...@apigee.com> Authored: Mon Apr 20 19:52:51 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Mon Apr 20 19:52:51 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 7 +- .../corepersistence/CpEntityManager.java | 85 +----- .../corepersistence/CpEntityManagerFactory.java | 119 +++----- .../corepersistence/CpRelationManager.java | 9 +- .../index/AsyncIndexProvider.java | 12 +- .../index/AsyncIndexService.java | 49 ---- .../index/AsyncReIndexService.java | 42 +++ .../index/InMemoryAsyncIndexService.java | 92 ------- .../index/InMemoryAsyncReIndexService.java | 89 ++++++ .../corepersistence/index/ReIndexAction.java | 33 +++ .../corepersistence/index/ReIndexService.java | 39 ++- .../index/ReIndexServiceImpl.java | 43 ++- .../index/SQSAsyncIndexService.java | 270 ------------------- .../index/SQSAsyncReIndexService.java | 269 ++++++++++++++++++ .../rx/impl/AllEntityIdsObservable.java | 5 +- .../rx/impl/AllEntityIdsObservableImpl.java | 4 +- .../corepersistence/util/CpNamingUtils.java | 6 +- .../usergrid/persistence/EntityManager.java | 5 - .../persistence/EntityManagerFactory.java | 11 +- .../index/SQSAsyncIndexServiceTest.java | 18 +- .../PerformanceEntityRebuildIndexTest.java | 18 +- .../cassandra/EntityManagerFactoryImplIT.java | 15 +- .../graph/serialization/EdgesObservable.java | 5 +- .../serialization/impl/EdgesObservableImpl.java | 6 +- 24 files changed, 584 insertions(+), 667 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index bb936b5..51972d8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -22,7 +22,7 @@ import org.apache.usergrid.corepersistence.events.EntityDeletedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler; import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler; import org.apache.usergrid.corepersistence.index.AsyncIndexProvider; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; +import org.apache.usergrid.corepersistence.index.AsyncReIndexService; import org.apache.usergrid.corepersistence.index.IndexService; import org.apache.usergrid.corepersistence.index.IndexServiceImpl; import org.apache.usergrid.corepersistence.index.QueryFig; @@ -37,9 +37,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl; import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl; -import org.apache.usergrid.persistence.core.rx.RxSchedulerFig; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl; import org.apache.usergrid.persistence.collection.event.EntityDeleted; import org.apache.usergrid.persistence.collection.event.EntityVersionCreated; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; @@ -154,7 +151,7 @@ public class CoreModule extends AbstractModule { bind( IndexService.class ).to( IndexServiceImpl.class ); //bind the queue provider - bind( AsyncIndexService.class).toProvider( AsyncIndexProvider.class ); + bind( AsyncReIndexService.class).toProvider( AsyncIndexProvider.class ); install( new GuicyFigModule( QueryFig.class ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 6c3989d..72ca955 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -37,8 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; -import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.AsyncReIndexService; import org.apache.usergrid.corepersistence.util.CpEntityMapUtils; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.AggregateCounter; @@ -49,7 +48,6 @@ import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.EntityFactory; import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.IndexBucketLocator; import org.apache.usergrid.persistence.Query; @@ -68,7 +66,6 @@ import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.util.TraceParticipant; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.FieldSet; -import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException; import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -81,7 +78,6 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; -import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.map.MapManager; @@ -179,7 +175,7 @@ public class CpEntityManager implements EntityManager { private final CounterUtils counterUtils; - private final AsyncIndexService indexService; + private final AsyncReIndexService indexService; private boolean skipAggregateCounters; private MetricsFactory metricsFactory; @@ -219,7 +215,7 @@ public class CpEntityManager implements EntityManager { * @param metricsFactory * @param applicationId */ - public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final AsyncIndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { + public CpEntityManager(final CassandraService cass, final CounterUtils counterUtils, final AsyncReIndexService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final UUID applicationId ) { Preconditions.checkNotNull( cass, "cass must not be null" ); Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" ); @@ -804,8 +800,8 @@ public class CpEntityManager implements EntityManager { StringField uniqueLookupRepairField = new StringField( propertyName, aliasType.toString()); - Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( Inflector.getInstance().singularize( collectionType ), - Arrays.<Field>asList( uniqueLookupRepairField ) ); + Observable<FieldSet> fieldSetObservable = ecm.getEntitiesFromFields( + Inflector.getInstance().singularize( collectionType ), Arrays.<Field>asList( uniqueLookupRepairField ) ); if(fieldSetObservable == null){ logger.debug( "Couldn't return the observable based on unique entities." ); @@ -1736,7 +1732,7 @@ public class CpEntityManager implements EntityManager { long timestamp = cass.createTimestamp(); Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be); CassandraPersistenceUtils.addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, - getRolePermissionsKey( roleName ), permission, timestamp ); + getRolePermissionsKey( roleName ), permission, timestamp ); //Adding graphite metrics Timer.Context timeRevokeRolePermission = entRevokeRolePermissionsTimer.time(); CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT ); @@ -1747,8 +1743,8 @@ public class CpEntityManager implements EntityManager { @Override public Set<String> getRolePermissions( String roleName ) throws Exception { roleName = roleName.toLowerCase(); - return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), - ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ) ); + return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES, + getRolePermissionsKey( roleName ) ); } //TODO: does this need graphite monitoring @@ -1830,8 +1826,8 @@ public class CpEntityManager implements EntityManager { @Override public Set<String> getGroupRolePermissions( UUID groupId, String roleName ) throws Exception { roleName = roleName.toLowerCase(); - return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), - ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( groupId, roleName ) ); + return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES, + getRolePermissionsKey( groupId, roleName ) ); } @@ -1840,7 +1836,7 @@ public class CpEntityManager implements EntityManager { roleName = roleName.toLowerCase(); removeFromDictionary( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ), DICTIONARY_ROLENAMES, roleName ); cass.deleteRow( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES, - SimpleRoleRef.getIdForGroupIdAndRoleName( groupId, roleName ) ); + SimpleRoleRef.getIdForGroupIdAndRoleName( groupId, roleName ) ); } @@ -1931,7 +1927,7 @@ public class CpEntityManager implements EntityManager { @Override public EntityRef getGroupRoleRef( UUID groupId, String roleName ) throws Exception { Results results = this.searchCollection( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ), - Schema.defaultCollectionName( Role.ENTITY_TYPE ), Query.fromQL( "roleName = '" + roleName + "'" ) ); + Schema.defaultCollectionName( Role.ENTITY_TYPE ), Query.fromQL( "roleName = '" + roleName + "'" ) ); Iterator<Entity> iterator = results.iterator(); EntityRef roleRef = null; while ( iterator.hasNext() ) { @@ -2830,64 +2826,7 @@ public class CpEntityManager implements EntityManager { - /** - * Completely reindex the named collection in the application associated with this EntityManager. - */ - @Override - public void reindexCollection( - final EntityManagerFactory.ProgressObserver po, String collectionName, boolean reverse) throws Exception { - - CpWalker walker = new CpWalker( ); - - walker.walkCollections( this, getApplication(), collectionName, reverse, new CpVisitor() { - - @Override - public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) { - - try { - em.update( entity ); - po.onProgress( entity ); - } - catch ( WriteOptimisticVerifyException wo ) { - // swallow this, it just means this was already updated, which accomplishes our task - logger.warn( "Someone beat us to updating entity {} in collection {}. Ignoring.", - entity.getName(), collName ); - } - catch ( Exception ex ) { - logger.error( "Error repersisting entity", ex ); - } - } - } ); - } - - - /** - * Completely reindex the application associated with this EntityManager. - */ - public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception { - - CpWalker walker = new CpWalker( ); - - walker.walkCollections( this, getApplication(), null, false, new CpVisitor() { - - @Override - public void visitCollectionEntry( EntityManager em, String collName, Entity entity ) { - try { - em.update( entity ); - po.onProgress( entity ); - } - catch ( WriteOptimisticVerifyException wo ) { - //swallow this, it just means this was already updated, which accomplishes our task. - logger.warn( "Someone beat us to updating entity {} in collection {}. Ignoring.", - entity.getName(), collName ); - } - catch ( Exception ex ) { - logger.error( "Error repersisting entity", ex ); - } - } - } ); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index b6d20b7..46db3f8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -15,24 +15,16 @@ */ package org.apache.usergrid.corepersistence; -import com.google.common.base.*; -import com.google.common.base.Optional; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.TypeLiteral; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.usergrid.persistence.index.IndexRefreshCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -41,11 +33,19 @@ import org.springframework.context.ApplicationContextAware; import org.apache.commons.lang.StringUtils; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; -import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.AsyncReIndexService; +import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.exception.ConflictException; -import org.apache.usergrid.persistence.*; +import org.apache.usergrid.persistence.AbstractEntity; +import org.apache.usergrid.persistence.Entity; +import org.apache.usergrid.persistence.EntityFactory; +import org.apache.usergrid.persistence.EntityManager; +import org.apache.usergrid.persistence.EntityManagerFactory; +import org.apache.usergrid.persistence.EntityRef; +import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.Results; +import org.apache.usergrid.persistence.SimpleEntityRef; import org.apache.usergrid.persistence.cassandra.CassandraService; import org.apache.usergrid.persistence.cassandra.CounterUtils; import org.apache.usergrid.persistence.cassandra.Setup; @@ -59,7 +59,6 @@ import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException; import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; -import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.SearchByEdgeType; @@ -67,17 +66,27 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.Query; +import org.apache.usergrid.persistence.index.IndexRefreshCommand; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import org.apache.usergrid.utils.UUIDUtils; -import rx.Observable; -import java.util.*; +import com.google.common.base.Optional; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; + +import rx.Observable; import static java.lang.String.CASE_INSENSITIVE_ORDER; -import static org.apache.usergrid.persistence.Schema.*; + +import static org.apache.usergrid.persistence.Schema.PROPERTY_APPLICATION_ID; +import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME; +import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; /** @@ -113,7 +122,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private Injector injector; private final EntityIndex entityIndex; private final MetricsFactory metricsFactory; - private final AsyncIndexService indexService; + private final AsyncReIndexService indexService; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector) { @@ -125,7 +134,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.entityIndexFactory = injector.getInstance(EntityIndexFactory.class); this.managerCache = injector.getInstance( ManagerCache.class ); this.metricsFactory = injector.getInstance( MetricsFactory.class ); - this.indexService = injector.getInstance( AsyncIndexService.class ); + this.indexService = injector.getInstance( AsyncReIndexService.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); @@ -307,14 +316,9 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application migrateAppInfo(applicationId,CpNamingUtils.DELETED_APPLICATION_INFO,CpNamingUtils.APPLICATION_INFO) .toBlocking().lastOrDefault(null); - this.rebuildApplicationIndexes(applicationId, new ProgressObserver() { - @Override - public void onProgress(EntityRef entity) { - logger.info("Restored entity {}:{}", entity.getType(), entity.getUuid()); - } - }); + throw new UnsupportedOperationException( "Implement index rebuild" ); - return managementEm.get(new SimpleEntityRef(CpNamingUtils.APPLICATION_INFO,applicationId)); +// return managementEm.get(new SimpleEntityRef(CpNamingUtils.APPLICATION_INFO,applicationId)); } @Override @@ -661,44 +665,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } - public void rebuildAllIndexes( ProgressObserver po ) throws Exception { - - logger.info("\n\nRebuilding all indexes\n"); - - rebuildInternalIndexes( po ); - - Map<String, UUID> appMap = getApplications(); - - logger.info("About to rebuild indexes for {} applications", appMap.keySet().size()); - - for ( UUID appUuid : appMap.values() ) { - try { - rebuildApplicationIndexes(appUuid, po); - } catch ( Exception e) { - logger.error("Error rebuilding index for app " + appUuid + " continuing...", e ); - } - } - } - - - @Override - public void rebuildInternalIndexes( ProgressObserver po ) throws Exception { - - // TODO: remove this after appinfo migration done - rebuildApplicationIndexes( CpNamingUtils.SYSTEM_APP_ID, po); - rebuildApplicationIndexes( CpNamingUtils.MANAGEMENT_APPLICATION_ID, po ); - } - - - @Override - public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception { - - EntityManager em = getEntityManager( appId ); - em.reindex( po ); - - logger.info("\n\nRebuilt index for applicationId {} \n", appId); - } - @Override @@ -716,18 +682,19 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application } @Override - public void rebuildCollectionIndex( - UUID appId, String collectionName, boolean reverse, ProgressObserver po ) throws Exception { - - EntityManager em = getEntityManager( appId ); - - //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild. - Application app = em.getApplication(); - - em.reindexCollection(po, collectionName, reverse); - - logger.info("\n\nRebuilt index for application {} id {} collection {}\n", - new Object[]{app.getName(), appId, collectionName}); + public ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection ) { + + throw new UnsupportedOperationException( "Implement me" ); +// +// EntityManager em = getEntityManager( appId ); +// +// //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild. +// Application app = em.getApplication(); +// +// em.reindexCollection(po, collectionName, reverse); +// +// logger.info("\n\nRebuilt index for application {} id {} collection {}\n", +// new Object[]{app.getName(), appId, collectionName}); } @Override http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index a3d8172..8f125ad 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -30,8 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; -import org.apache.usergrid.corepersistence.index.AsyncIndexService; -import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.AsyncReIndexService; import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl; import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor; @@ -43,7 +42,6 @@ import org.apache.usergrid.persistence.ConnectionRef; import org.apache.usergrid.persistence.Entity; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.EntityRef; -import org.apache.usergrid.persistence.IndexBucketLocator; import org.apache.usergrid.persistence.Query; import org.apache.usergrid.persistence.Query.Level; import org.apache.usergrid.persistence.RelationManager; @@ -80,7 +78,6 @@ import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import rx.Observable; -import rx.functions.Action1; import rx.functions.Func1; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionEdge; @@ -121,13 +118,13 @@ public class CpRelationManager implements RelationManager { private final ApplicationScope applicationScope; - private final AsyncIndexService indexService; + private final AsyncReIndexService indexService; private MetricsFactory metricsFactory; private Timer updateCollectionTimer; - public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final AsyncIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { + public CpRelationManager( final MetricsFactory metricsFactory, final ManagerCache managerCache, final AsyncReIndexService indexService, final EntityManager em, final UUID applicationId, final EntityRef headEntity) { Assert.notNull( em, "Entity manager cannot be null" ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java index d00ef8e..2c48c13 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexProvider.java @@ -35,7 +35,7 @@ import com.google.inject.Singleton; * A provider to allow users to configure their queue impl via properties */ @Singleton -public class AsyncIndexProvider implements Provider<AsyncIndexService> { +public class AsyncIndexProvider implements Provider<AsyncReIndexService> { private final QueryFig queryFig; @@ -46,7 +46,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { private final AllEntityIdsObservable allEntitiesObservable; private final EntityCollectionManagerFactory entityCollectionManagerFactory; - private AsyncIndexService asyncIndexService; + private AsyncReIndexService asyncIndexService; @Inject @@ -67,7 +67,7 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { @Override @Singleton - public AsyncIndexService get() { + public AsyncReIndexService get() { if ( asyncIndexService == null ) { asyncIndexService = getIndexService(); } @@ -77,17 +77,17 @@ public class AsyncIndexProvider implements Provider<AsyncIndexService> { } - private AsyncIndexService getIndexService() { + private AsyncReIndexService getIndexService() { final String value = queryFig.getQueueImplementation(); final Implementations impl = Implementations.valueOf( value ); switch ( impl ) { case LOCAL: - return new InMemoryAsyncIndexService( indexService, rxTaskScheduler, + return new InMemoryAsyncReIndexService( indexService, rxTaskScheduler, entityCollectionManagerFactory ); case SQS: - return new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); + return new SQSAsyncReIndexService( queueManagerFactory, queryFig, metricsFactory ); default: throw new IllegalArgumentException( "Configuration value of " + getErrorValues() + " are allowed" ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java deleted file mode 100644 index 8b5ced1..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncIndexService.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.util.UUID; - -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; - -import rx.Observable; - - -/** - * Low level queue service for indexing entities - */ -public interface AsyncIndexService extends ReIndexService.IndexAction { - - - /** - * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) - * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. - * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. - * @param applicationScope - * @param entity The entity to index - */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); - -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java new file mode 100644 index 0000000..c6eedd7 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/AsyncReIndexService.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; + + +/** + * Low level queue service for indexing entities + */ +public interface AsyncReIndexService extends ReIndexAction { + + + /** + * Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory) + * We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available. + * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. + * @param applicationScope + * @param entity The entity to index + */ + void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java deleted file mode 100644 index 0efb964..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncIndexService.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; -import org.apache.usergrid.persistence.collection.EntityCollectionManager; -import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.Singleton; - -import rx.Observable; - - -@Singleton -public class InMemoryAsyncIndexService implements AsyncIndexService { - - private static final Logger log = LoggerFactory.getLogger(InMemoryAsyncIndexService.class); - private final IndexService indexService; - private final RxTaskScheduler rxTaskScheduler; - private final EntityCollectionManagerFactory entityCollectionManagerFactory; - - - @Inject - public InMemoryAsyncIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, - final EntityCollectionManagerFactory entityCollectionManagerFactory ) { - this.indexService = indexService; - this.rxTaskScheduler = rxTaskScheduler; - this.entityCollectionManagerFactory = entityCollectionManagerFactory; - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity toIndex ) { - - //process the entity immediately - //only process the same version, otherwise ignore - - Observable.just( toIndex ).doOnNext( entity -> { - log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); - indexService.indexEntity( applicationScope, entity ); - } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); - } - - - - @Override - public void index( final EntityIdScope entityIdScope ) { - - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - - final Id entityId = entityIdScope.getId(); - - final Entity - entity = entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( - entityId ).toBlocking().lastOrDefault( null ); - - - if(entity == null){ - log.warn( "Could not find entity with id {} in app scope {} ", entityId, applicationScope ); - } - - indexService.indexEntity(applicationScope, entity ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java new file mode 100644 index 0000000..5ebda87 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/InMemoryAsyncReIndexService.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.entity.Id; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import rx.Observable; + + +@Singleton +public class InMemoryAsyncReIndexService implements AsyncReIndexService { + + private static final Logger log = LoggerFactory.getLogger(InMemoryAsyncReIndexService.class); + private final IndexService indexService; + private final RxTaskScheduler rxTaskScheduler; + private final EntityCollectionManagerFactory entityCollectionManagerFactory; + + + @Inject + public InMemoryAsyncReIndexService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler, + final EntityCollectionManagerFactory entityCollectionManagerFactory ) { + this.indexService = indexService; + this.rxTaskScheduler = rxTaskScheduler; + this.entityCollectionManagerFactory = entityCollectionManagerFactory; + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity toIndex ) { + + //process the entity immediately + //only process the same version, otherwise ignore + + Observable.just( toIndex ).doOnNext( entity -> { + log.debug( "Indexing entity {} in app scope {} ", entity, applicationScope ); + indexService.indexEntity( applicationScope, entity ); + } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); + } + + + + @Override + public void index( final EntityIdScope entityIdScope ) { + + final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); + + final Id entityId = entityIdScope.getId(); + + final Entity + entity = entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( + entityId ).toBlocking().lastOrDefault( null ); + + + if(entity == null){ + log.warn( "Could not find entity with id {} in app scope {} ", entityId, applicationScope ); + } + + indexService.indexEntity( applicationScope, entity ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java new file mode 100644 index 0000000..086b2aa --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; + + +/** + * Callback to perform an index operation based on an scope during bulk re-index operations + */ +@FunctionalInterface +public interface ReIndexAction { + + void index( final EntityIdScope entityIdScope ); +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index dca6cac..91409fe 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -20,10 +20,12 @@ package org.apache.usergrid.corepersistence.index; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -31,6 +33,7 @@ import com.google.common.base.Optional; import rx.Observable; import rx.Observer; +import rx.observables.ConnectableObservable; /** @@ -40,14 +43,14 @@ public interface ReIndexService { /** - * Reindex all applications using the cursor provided - * - * @param startTimestamp The timestamp to start seeking from - * - * @return a cursor that can be used to resume the operation on the next run + * Perform an index rebuild + * @param appId + * @param collection + * @return */ - IndexResponse reIndex( final rx.Observable<ApplicationScope> applicationScopes, final Optional<String> cursor, - final Optional<Long> startTimestamp, final IndexAction indexAction ); + IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> collectionName, final Optional<String> cursor, + final Optional<Long> startTimestamp ); + /** @@ -55,10 +58,10 @@ public interface ReIndexService { */ class IndexResponse { final String cursor; - final Observable<Long> indexedEdgecount; + final ConnectableObservable<EdgeScope> indexedEdgecount; - public IndexResponse( final String cursor, final Observable<Long> indexedEdgecount ) { + public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope> indexedEdgecount ) { this.cursor = cursor; this.indexedEdgecount = indexedEdgecount; } @@ -74,23 +77,13 @@ public interface ReIndexService { /** - * Return the observable long count of all edges indexed + * Return the observable of all edges to be indexed. + * + * Note that after subscribing "connect" will need to be called to ensure that processing begins * @return */ - public Observable<Long> getCount() { + public ConnectableObservable<EdgeScope> getCount() { return indexedEdgecount; } } - - - - - /** - * Callback to perform an index operation based on an scope during bulk re-index operations - */ - @FunctionalInterface - interface IndexAction { - - void index( final EntityIdScope entityIdScope ); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 5c022e1..3553c87 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -20,8 +20,10 @@ package org.apache.usergrid.corepersistence.index; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.corepersistence.util.CpNamingUtils; @@ -37,11 +39,16 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.util.UUIDGenerator; import com.google.common.base.Optional; +import com.google.inject.Inject; +import com.google.inject.Singleton; import rx.Observable; import rx.observables.ConnectableObservable; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope; + +@Singleton public class ReIndexServiceImpl implements ReIndexService { private static final MapScope RESUME_MAP_SCOPTE = @@ -51,48 +58,52 @@ public class ReIndexServiceImpl implements ReIndexService { private static final int INDEX_TTL = 60 * 60 * 24 * 10; + private final AllApplicationsObservable allApplicationsObservable; private final AllEntityIdsObservable allEntityIdsObservable; private final QueryFig queryFig; private final RxTaskScheduler rxTaskScheduler; private final MapManager mapManager; + private final AsyncReIndexService indexService; + @Inject public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable, - final MapManagerFactory mapManagerFactory, final QueryFig queryFig, - final RxTaskScheduler rxTaskScheduler ) { + final MapManagerFactory mapManagerFactory, + final AllApplicationsObservable allApplicationsObservable, final QueryFig queryFig, + final RxTaskScheduler rxTaskScheduler, final AsyncReIndexService indexService ) { this.allEntityIdsObservable = allEntityIdsObservable; + this.allApplicationsObservable = allApplicationsObservable; this.queryFig = queryFig; this.rxTaskScheduler = rxTaskScheduler; + this.indexService = indexService; this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE ); } - @Override - public IndexResponse reIndex( final Observable<ApplicationScope> applicationScopes, final Optional<String> cursor, - final Optional<Long> startTimestamp, final IndexAction indexAction ) { + @Override + public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, + final Optional<String> collectionName, final Optional<String> cursor, + final Optional<Long> startTimestamp ) { //load our last emitted Scope if a cursor is present if ( cursor.isPresent() ) { throw new UnsupportedOperationException( "Build this" ); } + + final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData(); + final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() ); //create an observable that loads each entity and indexes it, start it running with publish final ConnectableObservable<EdgeScope> runningReIndex = - allEntityIdsObservable.getEdgesToEntities( applicationScopes, startTimestamp ) + allEntityIdsObservable.getEdgesToEntities( applicationScopes, collectionName, startTimestamp ) //for each edge, create our scope and index on it - .doOnNext( edge -> indexAction - .index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ) + .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish(); - .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).publish(); - - - //count our longs - final Observable<Long> indexedCount = runningReIndex.countLong(); //start our sampler and state persistence @@ -107,7 +118,11 @@ public class ReIndexServiceImpl implements ReIndexService { } ).subscribe(); - return new IndexResponse( newCursor, indexedCount ); + //start pushing to both + runningReIndex.connect(); + + + return new IndexResponse( newCursor, runningReIndex ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java deleted file mode 100644 index 6d06637..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexService.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.index; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.metrics.MetricsFactory; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.model.entity.Entity; -import org.apache.usergrid.persistence.model.entity.Id; -import org.apache.usergrid.persistence.model.util.UUIDGenerator; -import org.apache.usergrid.persistence.queue.QueueManager; -import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.QueueMessage; -import org.apache.usergrid.persistence.queue.QueueScope; -import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.inject.Inject; -import com.google.inject.Singleton; - - -@Singleton -public class SQSAsyncIndexService implements AsyncIndexService { - - - private static final Logger logger = LoggerFactory.getLogger( SQSAsyncIndexService.class ); - - /** Hacky, copied from CPEntityManager b/c we can't access it here */ - public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); - - - /** - * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed - */ - public static final int TTL = 60 * 60 * 24 * 30; - - /** - * The name to put in the map - */ - public static final String MAP_NAME = "esqueuedata"; - - - private static final String QUEUE_NAME = "es_queue"; - - private static SmileFactory SMILE_FACTORY = new SmileFactory(); - - static { - SMILE_FACTORY.delegateToTextual( true ); - } - - - private final QueueManager queue; - private final QueryFig queryFig; - private final ObjectMapper mapper; - private final Meter readMeter; - private final Timer readTimer; - private final Meter writeMeter; - private final Timer writeTimer; - - - @Inject - public SQSAsyncIndexService( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, - final MetricsFactory metricsFactory ) { - final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); - - this.queue = queueManagerFactory.getQueueManager( queueScope ); - this.queryFig = queryFig; - - this.writeTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "write.timer" ); - this.writeMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, "write.meter" ); - - this.readTimer = metricsFactory.getTimer( SQSAsyncIndexService.class, "read.timer" ); - this.readMeter = metricsFactory.getMeter( SQSAsyncIndexService.class, "read.meter" ); - - this.mapper = new ObjectMapper( SMILE_FACTORY ); - //pretty print, disabling for speed - // mapper.enable(SerializationFeature.INDENT_OUTPUT); - - } - - - public void offer( final IndexEntityEvent operation ) { - final Timer.Context timer = this.writeTimer.time(); - this.writeMeter.mark(); - - final UUID identifier = UUIDGenerator.newTimeUUID(); - - try { - - final String payLoad = toString( operation ); - - //signal to SQS - this.queue.sendMessage( identifier ); - } - catch ( IOException e ) { - throw new RuntimeException( "Unable to queue message", e ); - } - finally { - timer.stop(); - } - } - - - public List<IndexEntityEvent> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { - - //SQS doesn't support more than 10 - - final int actualTake = Math.min( 10, takeSize ); - - final Timer.Context timer = this.readTimer.time(); - - try { - - List<QueueMessage> messages = queue - .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), - String.class ); - - - final List<IndexEntityEvent> response = new ArrayList<>( messages.size() ); - - final List<String> mapEntries = new ArrayList<>( messages.size() ); - - - if ( messages.size() == 0 ) { - return Collections.emptyList(); - } - - //add all our keys for a single round trip - for ( final QueueMessage message : messages ) { - mapEntries.add( message.getBody().toString() ); - } - - - //load them into our response - for ( final QueueMessage message : messages ) { - - final String payload = getBody( message ); - - //now see if the key was there - - - final IndexEntityEvent messageBody; - - try { - messageBody = fromString( payload ); - } - catch ( IOException e ) { - logger.error( "Unable to deserialize message from string. This is a bug", e ); - throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); - } - - SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); - - response.add( operation ); - } - - readMeter.mark( response.size() ); - return response; - } - //stop our timer - finally { - timer.stop(); - } - } - - - public void ack( final List<IndexEntityEvent> messages ) { - - //nothing to do - if ( messages.size() == 0 ) { - return; - } - - List<QueueMessage> toAck = new ArrayList<>( messages.size() ); - - for ( IndexEntityEvent ioe : messages ) { - - - final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; - - toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); - } - - queue.commitMessages( toAck ); - } - - - /** Read the object from Base64 string. */ - private IndexEntityEvent fromString( String s ) throws IOException { - IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class ); - return o; - } - - - /** Write the object to a Base64 string. */ - private String toString( IndexEntityEvent o ) throws IOException { - return mapper.writeValueAsString( o ); - } - - - private String getBody( final QueueMessage message ) { - return message.getBody().toString(); - } - - - @Override - public void index( final EntityIdScope entityIdScope ) { - - } - - - /** - * The message that subclasses our IndexOperationMessage. holds a pointer to the original message - */ - public class SqsIndexOperationMessage extends IndexEntityEvent { - - private final QueueMessage message; - - - public SqsIndexOperationMessage( final QueueMessage message, final IndexEntityEvent source ) { - super( source.getApplicationScope(), source.getEntityId(), source.getEntityVersion() ); - this.message = message; - } - - - /** - * Get the message from our queue - */ - public QueueMessage getMessage() { - return message; - } - } - - - @Override - public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { - throw new UnsupportedOperationException( "Implement me" ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java new file mode 100644 index 0000000..60a804c --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/SQSAsyncReIndexService.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.index; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.model.entity.Entity; +import org.apache.usergrid.persistence.model.util.UUIDGenerator; +import org.apache.usergrid.persistence.queue.QueueManager; +import org.apache.usergrid.persistence.queue.QueueManagerFactory; +import org.apache.usergrid.persistence.queue.QueueMessage; +import org.apache.usergrid.persistence.queue.QueueScope; +import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +@Singleton +public class SQSAsyncReIndexService implements AsyncReIndexService { + + + private static final Logger logger = LoggerFactory.getLogger( SQSAsyncReIndexService.class ); + + /** Hacky, copied from CPEntityManager b/c we can't access it here */ + public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" ); + + + /** + * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed + */ + public static final int TTL = 60 * 60 * 24 * 30; + + /** + * The name to put in the map + */ + public static final String MAP_NAME = "esqueuedata"; + + + private static final String QUEUE_NAME = "es_queue"; + + private static SmileFactory SMILE_FACTORY = new SmileFactory(); + + static { + SMILE_FACTORY.delegateToTextual( true ); + } + + + private final QueueManager queue; + private final QueryFig queryFig; + private final ObjectMapper mapper; + private final Meter readMeter; + private final Timer readTimer; + private final Meter writeMeter; + private final Timer writeTimer; + + + @Inject + public SQSAsyncReIndexService( final QueueManagerFactory queueManagerFactory, final QueryFig queryFig, + final MetricsFactory metricsFactory ) { + final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME ); + + this.queue = queueManagerFactory.getQueueManager( queueScope ); + this.queryFig = queryFig; + + this.writeTimer = metricsFactory.getTimer( SQSAsyncReIndexService.class, "write.timer" ); + this.writeMeter = metricsFactory.getMeter( SQSAsyncReIndexService.class, "write.meter" ); + + this.readTimer = metricsFactory.getTimer( SQSAsyncReIndexService.class, "read.timer" ); + this.readMeter = metricsFactory.getMeter( SQSAsyncReIndexService.class, "read.meter" ); + + this.mapper = new ObjectMapper( SMILE_FACTORY ); + //pretty print, disabling for speed + // mapper.enable(SerializationFeature.INDENT_OUTPUT); + + } + + + public void offer( final IndexEntityEvent operation ) { + final Timer.Context timer = this.writeTimer.time(); + this.writeMeter.mark(); + + final UUID identifier = UUIDGenerator.newTimeUUID(); + + try { + + final String payLoad = toString( operation ); + + //signal to SQS + this.queue.sendMessage( identifier ); + } + catch ( IOException e ) { + throw new RuntimeException( "Unable to queue message", e ); + } + finally { + timer.stop(); + } + } + + + public List<IndexEntityEvent> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) { + + //SQS doesn't support more than 10 + + final int actualTake = Math.min( 10, takeSize ); + + final Timer.Context timer = this.readTimer.time(); + + try { + + List<QueueMessage> messages = queue + .getMessages( actualTake, queryFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ), + String.class ); + + + final List<IndexEntityEvent> response = new ArrayList<>( messages.size() ); + + final List<String> mapEntries = new ArrayList<>( messages.size() ); + + + if ( messages.size() == 0 ) { + return Collections.emptyList(); + } + + //add all our keys for a single round trip + for ( final QueueMessage message : messages ) { + mapEntries.add( message.getBody().toString() ); + } + + + //load them into our response + for ( final QueueMessage message : messages ) { + + final String payload = getBody( message ); + + //now see if the key was there + + + final IndexEntityEvent messageBody; + + try { + messageBody = fromString( payload ); + } + catch ( IOException e ) { + logger.error( "Unable to deserialize message from string. This is a bug", e ); + throw new RuntimeException( "Unable to deserialize message from string. This is a bug", e ); + } + + SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody ); + + response.add( operation ); + } + + readMeter.mark( response.size() ); + return response; + } + //stop our timer + finally { + timer.stop(); + } + } + + + public void ack( final List<IndexEntityEvent> messages ) { + + //nothing to do + if ( messages.size() == 0 ) { + return; + } + + List<QueueMessage> toAck = new ArrayList<>( messages.size() ); + + for ( IndexEntityEvent ioe : messages ) { + + + final SqsIndexOperationMessage sqsIndexOperationMessage = ( SqsIndexOperationMessage ) ioe; + + toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() ); + } + + queue.commitMessages( toAck ); + } + + + /** Read the object from Base64 string. */ + private IndexEntityEvent fromString( String s ) throws IOException { + IndexEntityEvent o = mapper.readValue( s, IndexEntityEvent.class ); + return o; + } + + + /** Write the object to a Base64 string. */ + private String toString( IndexEntityEvent o ) throws IOException { + return mapper.writeValueAsString( o ); + } + + + private String getBody( final QueueMessage message ) { + return message.getBody().toString(); + } + + + @Override + public void index( final EntityIdScope entityIdScope ) { + + } + + + /** + * The message that subclasses our IndexOperationMessage. holds a pointer to the original message + */ + public class SqsIndexOperationMessage extends IndexEntityEvent { + + private final QueueMessage message; + + + public SqsIndexOperationMessage( final QueueMessage message, final IndexEntityEvent source ) { + super( source.getApplicationScope(), source.getEntityId(), source.getEntityVersion() ); + this.message = message; + } + + + /** + * Get the message from our queue + */ + public QueueMessage getMessage() { + return message; + } + } + + + @Override + public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity) { + throw new UnsupportedOperationException( "Implement index rebuild" ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java index c805a59..b9e5373 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java @@ -43,9 +43,10 @@ public interface AllEntityIdsObservable { /** * Get all edges that represent edges to entities in the system * @param appScopes - * @param startTime The time to + * @param edgeType The edge type to use (if specified) + * @param startTime The time to start with * @return */ - Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<Long> startTime); + Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java index f9df1f5..257fab1 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java @@ -81,12 +81,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable { @Override - public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<Long> startTime) { + public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime) { return appScopes.flatMap( applicationScope -> { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); - return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(), startTime ).map( edge -> new EdgeScope(applicationScope, edge )); + return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType, startTime ).map( edge -> new EdgeScope(applicationScope, edge )); } ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 67cc0ca..c42ad10 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -205,9 +205,7 @@ public class CpNamingUtils { public static ApplicationScope getApplicationScope( UUID applicationId ) { // We can always generate a scope, it doesn't matter if the application exists yet or not. - final ApplicationScopeImpl scope = new ApplicationScopeImpl( generateApplicationId( applicationId ) ); - - return scope; + return new ApplicationScopeImpl( generateApplicationId( applicationId ) ); } @@ -229,6 +227,8 @@ public class CpNamingUtils { return generateApplicationId( MANAGEMENT_APPLICATION_ID ); } + + /** * Get the map scope for the applicationId to store entity uuid to type mapping */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java index fc8b3d5..23b6d6b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java @@ -692,11 +692,6 @@ public interface EntityManager { /** For testing purposes */ public void flushManagerCaches(); - void reindexCollection( - EntityManagerFactory.ProgressObserver po, String collectionName, boolean reverse) throws Exception; - - public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception; - public Entity getUniqueEntityFromAlias( String aliasType, String aliasValue ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java index e70cd0d..b3f4b62 100644 --- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.UUID; import com.google.common.base.Optional; + +import org.apache.usergrid.corepersistence.index.ReIndexService; import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.index.IndexRefreshCommand; import rx.Observable; @@ -163,12 +165,6 @@ public interface EntityManagerFactory { public IndexRefreshCommand.IndexRefreshCommandInfo refreshIndex(); - public void rebuildAllIndexes( ProgressObserver po ) throws Exception; - - public void rebuildInternalIndexes( ProgressObserver po ) throws Exception; - - public void rebuildApplicationIndexes( UUID appId, ProgressObserver po ) throws Exception; - /** * Perform a realtime count of every entity in the system. This can be slow as it traverses the entire system graph @@ -178,8 +174,7 @@ public interface EntityManagerFactory { /** For testing purposes */ public void flushEntityManagerCaches(); - void rebuildCollectionIndex( - UUID appId, String collection, boolean reverse, ProgressObserver po) throws Exception; + ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection ); /** * Add a new index to the application for scale http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java index 0977caa..5e9a5a1 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/SQSAsyncIndexServiceTest.java @@ -20,19 +20,9 @@ package org.apache.usergrid.corepersistence.index; -import java.util.*; -import java.util.concurrent.TimeUnit; - import org.apache.usergrid.corepersistence.TestIndexModule; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.index.SearchEdge; -import org.apache.usergrid.persistence.index.impl.DeIndexRequest; import org.apache.usergrid.persistence.index.impl.EsRunner; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; -import org.apache.usergrid.persistence.index.impl.IndexRequest; -import org.apache.usergrid.persistence.index.impl.SearchEdgeImpl; -import org.apache.usergrid.persistence.model.entity.SimpleId; + import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -41,10 +31,8 @@ import org.junit.runner.RunWith; import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.test.UseModules; -import org.apache.usergrid.persistence.map.MapManagerFactory; import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.queue.QueueManagerFactory; -import org.apache.usergrid.persistence.queue.impl.UsergridAwsCredentialsProvider; import com.google.inject.Inject; @@ -80,11 +68,11 @@ public class SQSAsyncIndexServiceTest { public MetricsFactory metricsFactory; - private SQSAsyncIndexService bufferQueueSQS; + private SQSAsyncReIndexService bufferQueueSQS; @Before public void setup(){ - bufferQueueSQS = new SQSAsyncIndexService( queueManagerFactory, queryFig, metricsFactory ); + bufferQueueSQS = new SQSAsyncReIndexService( queueManagerFactory, queryFig, metricsFactory ); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java index b0fc245..a293a0c 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java @@ -213,11 +213,12 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { try { - // do it forwards - setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), "catherders", false, po ); - - // and backwards, just to make sure both cases are covered - setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), "catherders", true, po ); + fail( "Implement index rebuild" ); +// // do it forwards +// setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), "catherders", false, po ); +// +// // and backwards, just to make sure both cases are covered +// setup.getEmf().rebuildCollectionIndex( em.getApplicationId(), "catherders", true, po ); reporter.report(); registry.remove( meterName ); @@ -354,9 +355,10 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT { try { - setup.getEmf().rebuildInternalIndexes( po ); - - setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po ); + fail( "Implement index rebuild" ); +// setup.getEmf().rebuildInternalIndexes( po ); +// +// setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po ); reporter.report(); registry.remove( meterName ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java index 1a0a13b..2452606 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java @@ -52,6 +52,8 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + @NotThreadSafe public class EntityManagerFactoryImplIT extends AbstractCoreIT { @@ -175,12 +177,13 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT { // restore the app emf.restoreApplication(deletedAppId); - emf.rebuildAllIndexes(new EntityManagerFactory.ProgressObserver() { - @Override - public void onProgress(EntityRef entity) { - logger.debug("Reindexing {}:{}", entity.getType(), entity.getUuid()); - } - }); + fail( "Implement index rebuild" ); +// emf.rebuildAllIndexes(new EntityManagerFactory.ProgressObserver() { +// @Override +// public void onProgress(EntityRef entity) { +// logger.debug("Reindexing {}:{}", entity.getType(), entity.getUuid()); +// } +// }); this.app.refreshIndex(); // test to see that app now works and is happy http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java index 9e7b8e6..9f0bd60 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java @@ -45,10 +45,11 @@ public interface EdgesObservable { * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified * @param gm * @param sourceNode - * @param startTimestamp + * @param edgeType The edge type if specified. Otherwise all types will be used + * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used * @return */ - Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, + Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType, final Optional<Long> startTimestamp ); /** http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3480a363/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java index 859ca2e..ca9fb03 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java @@ -71,10 +71,12 @@ public class EdgesObservableImpl implements EdgesObservable { @Override - public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, + public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput, final Optional<Long> startTimestamp ) { - final Observable<String> edgeTypes = + + + final Observable<String> edgeTypes = edgeTypeInput.isPresent()? Observable.just( edgeTypeInput.get() ): gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );