Refactored index scope generation to be more consistent and clean Moved some newly private methods to test utils
Added onStart event to the observable iterator Removes group by collection type from filtering loader Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7c356d8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7c356d8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7c356d8b Branch: refs/heads/two-dot-o-dev Commit: 7c356d8b2ab256bb20c498c11924820441f5495f Parents: cf3f7ab Author: Todd Nine <[email protected]> Authored: Mon Mar 30 09:45:46 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Mar 30 12:11:44 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 29 +--- .../corepersistence/CpRelationManager.java | 61 +++------ .../usergrid/corepersistence/CpWalker.java | 4 +- .../events/EntityDeletedHandler.java | 31 ++--- .../events/EntityVersionCreatedHandler.java | 2 +- .../events/EntityVersionDeletedHandler.java | 87 +++++++++--- .../results/FilteringLoader.java | 108 ++++++--------- .../corepersistence/util/CpNamingUtils.java | 134 ++++++++++++++++--- .../corepersistence/StaleIndexCleanupTest.java | 10 +- .../rx/EdgesFromSourceObservableIT.java | 9 +- .../rx/EdgesToTargetObservableIT.java | 53 ++++---- .../apache/usergrid/utils/EdgeTestUtils.java | 50 +++++++ .../persistence/core/rx/ObservableIterator.java | 2 + 13 files changed, 347 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index c45c390..a7dda13 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.entities.Event; import org.apache.usergrid.persistence.entities.Group; @@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.index.query.Query; @@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; -import com.netflix.hystrix.exception.HystrixRuntimeException; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.ColumnSlice; @@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator; import static org.apache.commons.lang.StringUtils.capitalize; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS; import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS; @@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager { catch ( WriteUniqueVerifyException wuve ) { handleWriteUniqueVerifyException( entity, wuve ); } - catch ( HystrixRuntimeException hre ) { - - if ( hre.getCause() instanceof WriteUniqueVerifyException ) { - WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause(); - handleWriteUniqueVerifyException( entity, wuve ); - } - - throw hre; - } // update in all containing collections and connection indexes CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity ); @@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager { @Override public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception { - IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(), - getCollectionScopeNameFromEntityType( entityRef.getType() ) ); - - Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) { @@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager { catch ( WriteUniqueVerifyException wuve ) { handleWriteUniqueVerifyException( entity, wuve ); } - catch ( HystrixRuntimeException hre ) { - if ( hre.getCause() instanceof WriteUniqueVerifyException ) { - WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause(); - handleWriteUniqueVerifyException( entity, wuve ); - } - } // Index CP entity into default collection scope // IndexScope defaultIndexScope = new IndexScopeImpl( @@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager { final EntityIndexBatch batch = aie.createBatch(); // index member into entity collection | type scope - IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); - + IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName ); batch.index( collectionIndexScope, memberEntity ); //TODO REMOVE INDEX CODE http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 3427684..b76f38f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.index.query.Query; import org.apache.usergrid.persistence.index.query.Query.Level; @@ -118,6 +117,11 @@ import rx.functions.Func1; import static java.util.Arrays.asList; import static me.prettyprint.hector.api.factory.HFactory.createMutator; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES; import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES; @@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager { final EntityRef eref = new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); - String name; - if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) { - name = CpNamingUtils.getConnectionType( edge.getType() ); - } - else { - name = CpNamingUtils.getCollectionName( edge.getType() ); - } + String name = getNameFromEdgeType(edge.getType()); addMapSet( entityRefSetMap, eref, name ); } ).toBlocking().last(); @@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager { @Override public void call( final Edge edge ) { - EntityRef sourceEntity = - new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); // reindex the entity in the source entity's collection or connection index - IndexScope indexScope; - if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) { - - String collName = CpNamingUtils.getCollectionName( edge.getType() ); - indexScope = new IndexScopeImpl( - new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName )); - } - else { + IndexScope indexScope = generateScopeFromSource(edge); - String connName = CpNamingUtils.getConnectionType( edge.getType() ); - indexScope = new IndexScopeImpl( - new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ), - CpNamingUtils.getConnectionScopeName( connName ) ); - } entityIndexBatch.index( indexScope, cpEntity ); @@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager { Iterator<String> iter = str.toBlocking().getIterator(); while ( iter.hasNext() ) { String edgeType = iter.next(); - indexes.add( CpNamingUtils.getCollectionName( edgeType ) ); + indexes.add( getNameFromEdgeType( edgeType ) ); } return indexes; @@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager { final EntityIndexBatch batch = ei.createBatch(); // remove item from collection index - IndexScope indexScope = new IndexScopeImpl( - cpHeadEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName ); batch.deindex( indexScope, memberEntity ); // remove collection from item index - IndexScope itemScope = new IndexScopeImpl( - memberEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( - Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) ); + IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(), + Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ); batch.deindex( itemScope, cpHeadEntity ); - BetterFuture future = batch.execute(); + batch.execute(); // remove edge from collection to item GraphManager gm = managerCache.getGraphManager( applicationScope ); @@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager { + "' of " + headEntity.getType() + ":" + headEntity .getUuid() ); } - final IndexScope indexScope = new IndexScopeImpl( - cpHeadEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName ); final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); @@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager { EntityIndexBatch batch = ei.createBatch(); // Index the new connection in app|source|type context - IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), - CpNamingUtils.getConnectionScopeName( connectionType ) ); + IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType ); batch.index( indexScope, targetEntity ); @@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager { final EntityIndexBatch batch = ei.createBatch(); // Deindex the connection in app|source|type context - IndexScope indexScope = new IndexScopeImpl( - new SimpleId( connectingEntityRef.getUuid(), - connectingEntityRef.getType() ), - CpNamingUtils.getConnectionScopeName( connectionType ) ); + final Id cpId = createId( connectingEntityRef ); + IndexScope indexScope = generateScopeFromConnection( cpId, connectionType ); batch.deindex( indexScope, targetEntity ); // Deindex the connection in app|source|type context @@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager { headEntity = em.validate( headEntity ); - final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), - CpNamingUtils.getConnectionScopeName( connection ) ); + final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection ); final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java index c14447d..3f2c9d6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java @@ -36,6 +36,8 @@ import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; + /** * Takes a visitor to all collections and entities. @@ -128,7 +130,7 @@ public class CpWalker { if ( entity == null ) { return; } - String collName = CpNamingUtils.getCollectionName( edgeValue.getType() ); + String collName = getNameFromEdgeType( edgeValue.getType() ); visitor.visitCollectionEntry( em, collName, entity ); } ).subscribeOn( Schedulers.io() ); }, 100 ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java index 78c1ca7..57d69bc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java @@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted { return; } - - - if(logger.isDebugEnabled()) { - logger.debug( - "Handling deleted event for entity {}:{} v {} " + " app: {}", - new Object[] { - entityId.getType(), entityId.getUuid(), version, - scope.getApplication() - } ); - } - - CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; - final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); - - throw new NotImplementedException( "Fix this" ); +// This is a NO-OP now, it's handled by the EntityVersionDeletedHandler + +// +// if(logger.isDebugEnabled()) { +// logger.debug( +// "Handling deleted event for entity {}:{} v {} " + " app: {}", +// new Object[] { +// entityId.getType(), entityId.getUuid(), version, +// scope.getApplication() +// } ); +// } +// +// CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; +// final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); + +// throw new NotImplementedException( "Fix this" ); //read all edges to this node and de-index them http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java index c000500..0163fc2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java @@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated { @Override public void versionCreated( final ApplicationScope scope, final Entity entity ) { - //not op, we're not migrating properly to this. Make this an event + //not op, we're not migrating properly to this. Make this an event At the moment this is happening on write // // This check is for testing purposes and for a test that to be able to dynamically turn // // off and on delete previous versions so that it can test clean-up on read. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java index 4fa5ce1..a2e9b30 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java @@ -24,18 +24,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.CpEntityManagerFactory; -import org.apache.usergrid.exception.NotImplementedException; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.IndexBatchBuffer; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; import com.google.inject.Singleton; +import rx.Observable; + import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget; /** @@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { private final EntityManagerFactory emf; + private final EdgesObservable edgesObservable; + private final SerializationFig serializationFig; @Inject - public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;} + public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable, + final SerializationFig serializationFig ) { + this.emf = emf; + this.edgesObservable = edgesObservable; + this.serializationFig = serializationFig; + } @Override @@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { } if ( logger.isDebugEnabled() ) { - logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " - + " app: {}", new Object[] { - entityVersions.size(), entityId.getType(), entityId.getUuid(), - scope.getApplication() - } ); + logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + " app: {}", new Object[] { + entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication() + } ); } CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); + final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope ); + + + //create an observable of all scopes to deIndex + //remove all indexes pointing to this + final Observable<IndexScope> targetScopes = edgesObservable.edgesToTarget( gm, entityId ).map( + edge -> generateScopeFromSource( edge) ); + + + //Remove all double indexes + final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map( + edge -> generateScopeToTarget( edge ) ); + + + //create a stream of scopes + final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap( + indexScope -> Observable.from( entityVersions ) + .map( version -> new IndexScopeVersion( indexScope, version ) ) ); + + //create a set of batches + final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap( + bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(), + ( EntityIndexBatch batch, IndexScopeVersion version ) -> { + //deindex in this batch + batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() ); + } ) ); - throw new NotImplementedException( "Fix this" ); + //execute the batches + batches.doOnNext( batch -> batch.execute() ).toBlocking().last(); -// final IndexScope indexScope = -// new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ), -// scope.getName() ); -// -// //create our batch, and then collect all of them into a single batch -// Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> { -// entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() ); -// } ) -// //after our batch is collected, execute it -// .doOnNext( entityIndexBatch -> { -// entityIndexBatch.execute(); -// } ).toBlocking().last(); + } + + + + + + private static final class IndexScopeVersion{ + private final IndexScope scope; + private final MvccLogEntry version; + + + private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) { + this.scope = scope; + this.version = version; + } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java index 2eb6675..c12bb2c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -34,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.ManagerCache; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.Results; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -48,7 +46,6 @@ import org.apache.usergrid.persistence.model.entity.Id; import com.fasterxml.uuid.UUIDComparator; import com.google.common.base.Function; import com.google.common.collect.Collections2; -import com.google.common.collect.HashMultimap; public class FilteringLoader implements ResultsLoader { @@ -64,16 +61,14 @@ public class FilteringLoader implements ResultsLoader { /** * Create an instance of a filter loader + * * @param managerCache The manager cache to load - * @param resultsVerifier The verifier to verify the candidate results + * @param resultsVerifier The verifier to verify the candidate results * @param applicationScope The application scope to perform the load * @param indexScope The index scope used in the search */ - protected FilteringLoader( - final ManagerCache managerCache, - final ResultsVerifier resultsVerifier, - final ApplicationScope applicationScope, - final IndexScope indexScope ) { + protected FilteringLoader( final ManagerCache managerCache, final ResultsVerifier resultsVerifier, + final ApplicationScope applicationScope, final IndexScope indexScope ) { this.managerCache = managerCache; this.resultsVerifier = resultsVerifier; @@ -90,7 +85,7 @@ public class FilteringLoader implements ResultsLoader { public Results loadResults( final CandidateResults crs ) { - if(crs.size() == 0){ + if ( crs.size() == 0 ) { return new Results(); } @@ -101,11 +96,6 @@ public class FilteringLoader implements ResultsLoader { // Maps the entity ids to our candidates final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() ); - // Groups all candidate results by types. When search connections there will be multiple - // types, so we want to batch fetch them more efficiently - - final HashMultimap<String, CandidateResult> groupedByScopes = - HashMultimap.create( crs.size(), crs.size() ); final Iterator<CandidateResult> iter = crs.iterator(); @@ -119,9 +109,6 @@ public class FilteringLoader implements ResultsLoader { final CandidateResult currentCandidate = iter.next(); - final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( - currentCandidate.getId().getType() ); - final Id entityId = currentCandidate.getId(); //check if we've seen this candidate by id @@ -131,7 +118,6 @@ public class FilteringLoader implements ResultsLoader { if ( previousMax == null ) { maxCandidateMapping.put( entityId, currentCandidate ); orderIndex.put( entityId, i ); - groupedByScopes.put( collectionType, currentCandidate ); continue; } @@ -146,12 +132,12 @@ public class FilteringLoader implements ResultsLoader { final CandidateResult toKeep; //current is newer than previous. Remove previous and keep current - if(UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ){ + if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) { toRemove = previousMax; toKeep = currentCandidate; } //previously seen value is newer than current. Remove the current and keep the previously seen value - else{ + else { toRemove = currentCandidate; toKeep = previousMax; } @@ -160,17 +146,13 @@ public class FilteringLoader implements ResultsLoader { //de-index it - logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", - new Object[] { - entityId.getUuid(), - entityId.getType(), - toRemove.getVersion(), - toKeep.getVersion() } ); + logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] { + entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion() + } ); //deindex this document, and remove the previous maxVersion //we have to deindex this from our ownerId, since this is what gave us the reference indexBatch.deindex( indexScope, toRemove ); - groupedByScopes.remove( collectionType, toRemove ); //TODO, fire the entity repair cleanup task here instead of de-indexing @@ -178,7 +160,6 @@ public class FilteringLoader implements ResultsLoader { //replace the value with a more current version maxCandidateMapping.put( entityId, toKeep ); orderIndex.put( entityId, i ); - groupedByScopes.put( collectionType, toKeep ); } @@ -187,57 +168,52 @@ public class FilteringLoader implements ResultsLoader { final TreeMap<Integer, Id> sortedResults = new TreeMap<>(); - for ( final String scopeName : groupedByScopes.keySet() ) { - - final Set<CandidateResult> candidateResults = groupedByScopes.get( scopeName ); - - final Collection<Id> idsToLoad = - Collections2.transform( candidateResults, new Function<CandidateResult, Id>() { - @Nullable - @Override - public Id apply( @Nullable final CandidateResult input ) { - //NOTE this is never null, we won't need to check - return input.getId(); - } - } ); + final Collection<Id> idsToLoad = + Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() { + @Nullable + @Override + public Id apply( @Nullable final CandidateResult input ) { + //NOTE this is never null, we won't need to check + return input.getId(); + } + } ); - //now using the scope, load the collection + //now using the scope, load the collection - // Get the collection scope and batch load all the versions. We put all entities in - // app/app for easy retrieval/ unless persistence changes, we never want to read from - // any scope other than the app, app, scope name scope -// final CollectionScope collScope = new CollectionScopeImpl( -// applicationScope.getApplication(), applicationScope.getApplication(), scopeName); + // Get the collection scope and batch load all the versions. We put all entities in + // app/app for easy retrieval/ unless persistence changes, we never want to read from + // any scope other than the app, app, scope name scope + // final CollectionScope collScope = new CollectionScopeImpl( + // applicationScope.getApplication(), applicationScope.getApplication(), scopeName); - final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope); + final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope ); - //load the results into the loader for this scope for validation - resultsVerifier.loadResults( idsToLoad, ecm ); + //load the results into the loader for this scope for validation + resultsVerifier.loadResults( idsToLoad, ecm ); - //now let the loader validate each candidate. For instance, the "max" in this candidate - //could still be a stale result, so it needs validated - for ( final Id requestedId : idsToLoad ) { + //now let the loader validate each candidate. For instance, the "max" in this candidate + //could still be a stale result, so it needs validated + for ( final Id requestedId : idsToLoad ) { - final CandidateResult cr = maxCandidateMapping.get( requestedId ); + final CandidateResult cr = maxCandidateMapping.get( requestedId ); - //ask the loader if this is valid, if not discard it and de-index it - if ( !resultsVerifier.isValid( cr ) ) { - indexBatch.deindex( indexScope, cr ); - continue; - } + //ask the loader if this is valid, if not discard it and de-index it + if ( !resultsVerifier.isValid( cr ) ) { + indexBatch.deindex( indexScope, cr ); + continue; + } - //if we get here we're good, we need to add this to our results - final int candidateIndex = orderIndex.get( requestedId ); + //if we get here we're good, we need to add this to our results + final int candidateIndex = orderIndex.get( requestedId ); - sortedResults.put( candidateIndex, requestedId ); - } + sortedResults.put( candidateIndex, requestedId ); } - // NOTE DO NOT execute the batch here. + // NOTE DO NOT execute the batch here. // It changes the results and we need consistent paging until we aggregate all results return resultsVerifier.getResults( sortedResults.values() ); } @@ -247,6 +223,4 @@ public class FilteringLoader implements ResultsLoader { public void postProcess() { this.indexBatch.execute(); } - - } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 2e9fb55..652742b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util; import java.util.UUID; +import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.entities.Application; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import com.clearspring.analytics.util.Preconditions; + /** * Utilises for constructing standard naming conventions for collections and connections @@ -80,66 +86,150 @@ public class CpNamingUtils { * @param type * @return */ - public static String getCollectionScopeNameFromEntityType( String type ) { + private static String getCollectionScopeNameFromEntityType( String type ) { String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type ); return csn.toLowerCase(); } - public static String getCollectionScopeNameFromCollectionName( String name ) { + private static String getCollectionScopeNameFromCollectionName( String name ) { String csn = EDGE_COLL_SUFFIX + name; return csn.toLowerCase(); } - public static String getConnectionScopeName( String connectionType ) { + private static String getConnectionScopeName( String connectionType ) { String csn = EDGE_CONN_SUFFIX + connectionType ; return csn.toLowerCase(); } - public static boolean isCollectionEdgeType( String type ) { + /** + * Get the index scope for the edge from the source + * @param edge + * @return + */ + public static IndexScope generateScopeFromSource(final Edge edge ){ + + + final Id nodeId = edge.getSourceNode(); + final String scopeName = getNameFromEdgeType( edge.getType() ); + + + return new IndexScopeImpl( nodeId, scopeName ); + + } + + + + + + /** + * Get the index scope for the edge from the source + * @param edge + * @return + */ + public static IndexScope generateScopeToTarget(final Edge edge ){ + + + + final Id nodeId = edge.getTargetNode(); + final String scopeName = getNameFromEdgeType( edge.getType() ); + + + return new IndexScopeImpl( nodeId, scopeName ); + + } + + + /** + * Generate either the collection name or connection name from the edgeName + * @param edgeName + * @return + */ + public static String getNameFromEdgeType(final String edgeName){ + + + if(isCollectionEdgeType( edgeName )){ + return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) ); + } + + return getConnectionScopeName(getConnectionType( edgeName ) ); + + } + + + /** + * Get the index scope from the colleciton name + * @param nodeId The source or target node id + * @param collectionName The name of the collection. Ex "users" + * @return + */ + public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){ + return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) ); + } + + + /** + * Get the scope from the connection + * @param nodeId + * @param connectionName + * @return + */ + public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){ + return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) ); + } + + + /** + * Create an Id object from the entity ref + * @param entityRef + * @return + */ + public static Id createId(final EntityRef entityRef){ + return new SimpleId( entityRef.getUuid(), entityRef.getType() ); + } + + private static boolean isCollectionEdgeType( String type ) { return type.startsWith( EDGE_COLL_SUFFIX ); } - public static boolean isConnectionEdgeType( String type ) { + private static boolean isConnectionEdgeType( String type ) { return type.startsWith( EDGE_CONN_SUFFIX ); } - static public String getConnectionType( String edgeType ) { + + private static String getConnectionType( String edgeType ) { String[] parts = edgeType.split( "\\|" ); return parts[1]; } - static public String getCollectionName( String edgeType ) { + private static String getCollectionName( String edgeType ) { String[] parts = edgeType.split( "\\|" ); return parts[1]; } + /** + * Generate a standard edge name for our graph using the connection name + * @param connectionType The type of connection made + * @return + */ public static String getEdgeTypeFromConnectionType( String connectionType ) { - - if ( connectionType != null ) { - String csn = EDGE_CONN_SUFFIX + "|" + connectionType; - return csn; - } - - return null; + return (EDGE_CONN_SUFFIX + "|" + connectionType).toLowerCase(); } + /** + * Generate a standard edges from for a collection + * @param collectionName + * @return + */ public static String getEdgeTypeFromCollectionName( String collectionName ) { - - if ( collectionName != null ) { - String csn = EDGE_COLL_SUFFIX + "|" + collectionName; - return csn; - } - - - return null; + return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index 5c166c5..f743f0b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import com.fasterxml.uuid.UUIDComparator; @@ -60,7 +61,9 @@ import com.google.inject.Injector; import net.jcip.annotations.NotThreadSafe; import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { EntityManager em = app.getEntityManager(); - EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class ); + EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance( + EntityIndexFactory.class ); ApplicationScope as = new ApplicationScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) ); ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as); - IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION); + IndexScope is = generateScopeFromCollection(rootId, collName ); Query rcq = Query.fromQL( query ); // TODO: why does this have no effect; max we ever get is 1000 entities http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java index 50c2cd9..3bfe460 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java @@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.utils.EdgeTestUtils; import com.google.inject.Injector; @@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT { final Id source = edge.getSourceNode(); //test if we're a collection, if so - if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) { - final String collectionName = CpNamingUtils.getCollectionName( edgeType ); + if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) { + final String collectionName = EdgeTestUtils.getNameForEdge( edgeType ); assertEquals("application source returned", createdApplication.getUuid(), source.getUuid()); @@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT { - if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) { + if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) { fail( "Only connection edges should be encountered" ); } - final String connectionType = CpNamingUtils.getConnectionType( edgeType ); + final String connectionType = EdgeTestUtils.getNameForEdge( edgeType ); assertEquals( "Same connection type expected", "likes", connectionType ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java index 9f1bb17..6d228b2 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java @@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.utils.EdgeTestUtils; import com.google.inject.Injector; @@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { final GraphManager gm = managerCache.getGraphManager( scope ); - edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() { - @Override - public void call( final Edge edge ) { - final String edgeType = edge.getType(); - final Id target = edge.getTargetNode(); + edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> { + final String edgeType = edge.getType(); + final Id target = edge.getTargetNode(); - //test if we're a collection, if so remove ourselves fro the types - if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) { - fail( "Connections should be the only type encountered" ); - } + //test if we're a collection, if so remove ourselves fro the types + if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) { + fail( "Connections should be the only type encountered" ); + } - final String collectionType = CpNamingUtils.getCollectionName( edgeType ); + final String collectionType = EdgeTestUtils.getNameForEdge( edgeType ); - if ( collectionType.equals( type1 ) ) { - assertTrue( "Element should be present on removal", type1Identities.remove( target ) ); - } - else if ( collectionType.equals( type2 ) ) { - assertTrue( "Element should be present on removal", type2Identities.remove( target ) ); - } + if ( collectionType.equals( type1 ) ) { + assertTrue( "Element should be present on removal", type1Identities.remove( target ) ); + } + else if ( collectionType.equals( type2 ) ) { + assertTrue( "Element should be present on removal", type2Identities.remove( target ) ); + } - } } ).toBlocking().lastOrDefault( null ); @@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { //test connections - edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() { - @Override - public void call( final Edge edge ) { - final String edgeType = edge.getType(); - final Id target = edge.getTargetNode(); + edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> { + final String edgeType = edge.getType(); + final Id target = edge.getTargetNode(); - if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) { - fail( "Only connection edges should be encountered" ); - } + if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) { + fail( "Only connection edges should be encountered" ); + } - final String connectionType = CpNamingUtils.getConnectionType( edgeType ); + final String connectionType = EdgeTestUtils.getNameForEdge( edgeType ); - assertEquals( "Same connection type expected", "likes", connectionType ); + assertEquals( "Same connection type expected", "likes", connectionType ); - assertTrue( "Element should be present on removal", connections.remove( target ) ); - } + assertTrue( "Element should be present on removal", connections.remove( target ) ); } ).toBlocking().lastOrDefault( null ); assertEquals( "Every connection should have been encountered", 0, connections.size() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java new file mode 100644 index 0000000..f217338 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.utils; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; + +import static org.junit.Assert.assertEquals; + + +public class EdgeTestUtils { + + /** + * Get the name for an edge + */ + public static String getNameForEdge( final String edgeName ) { + final String[] parts = edgeName.split( "\\|" ); + + assertEquals( "there should be 2 parts", parts.length, 2 ); + + return parts[1]; + } + + + public static boolean isCollectionEdgeType( String type ) { + return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX ); + } + + + public static boolean isConnectionEdgeType( String type ) { + return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7c356d8b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java index 2bd1edb..84a7fc3 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java @@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> try { + subscriber.onStart(); + //get our iterator and push data to the observer final Iterator<T> itr = getIterator();
