Refactored index scope generation to be more consistent and clean Moved some newly private methods to test utils
Added onStart event to the observable iterator Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fbce62df Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fbce62df Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fbce62df Branch: refs/heads/USERGRID-509 Commit: fbce62df84323ea19dce8a00fb6e52aa245e7bb5 Parents: cf3f7ab Author: Todd Nine <[email protected]> Authored: Mon Mar 30 09:45:46 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Mon Mar 30 12:03:10 2015 -0600 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 29 +--- .../corepersistence/CpRelationManager.java | 61 +++------ .../usergrid/corepersistence/CpWalker.java | 4 +- .../events/EntityDeletedHandler.java | 31 ++--- .../events/EntityVersionCreatedHandler.java | 2 +- .../events/EntityVersionDeletedHandler.java | 87 +++++++++--- .../results/FilteringLoader.java | 3 +- .../corepersistence/util/CpNamingUtils.java | 134 ++++++++++++++++--- .../corepersistence/StaleIndexCleanupTest.java | 10 +- .../rx/EdgesFromSourceObservableIT.java | 9 +- .../rx/EdgesToTargetObservableIT.java | 53 ++++---- .../apache/usergrid/utils/EdgeTestUtils.java | 50 +++++++ .../persistence/core/rx/ObservableIterator.java | 2 + 13 files changed, 307 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index c45c390..a7dda13 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.util.Health; import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.entities.Event; import org.apache.usergrid.persistence.entities.Group; @@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; -import org.apache.usergrid.persistence.index.EntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; -import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.index.query.Query; @@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; -import com.netflix.hystrix.exception.HystrixRuntimeException; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.ColumnSlice; @@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator; import static org.apache.commons.lang.StringUtils.capitalize; import static org.apache.commons.lang.StringUtils.isBlank; import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity; -import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS; import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS; @@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager { catch ( WriteUniqueVerifyException wuve ) { handleWriteUniqueVerifyException( entity, wuve ); } - catch ( HystrixRuntimeException hre ) { - - if ( hre.getCause() instanceof WriteUniqueVerifyException ) { - WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause(); - handleWriteUniqueVerifyException( entity, wuve ); - } - - throw hre; - } // update in all containing collections and connection indexes CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity ); @@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager { @Override public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception { - IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(), - getCollectionScopeNameFromEntityType( entityRef.getType() ) ); - - Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) { @@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager { catch ( WriteUniqueVerifyException wuve ) { handleWriteUniqueVerifyException( entity, wuve ); } - catch ( HystrixRuntimeException hre ) { - if ( hre.getCause() instanceof WriteUniqueVerifyException ) { - WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause(); - handleWriteUniqueVerifyException( entity, wuve ); - } - } // Index CP entity into default collection scope // IndexScope defaultIndexScope = new IndexScopeImpl( @@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager { final EntityIndexBatch batch = aie.createBatch(); // index member into entity collection | type scope - IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); - + IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName ); batch.index( collectionIndexScope, memberEntity ); //TODO REMOVE INDEX CODE http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 3427684..b76f38f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.IndexScope; import org.apache.usergrid.persistence.index.SearchTypes; -import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.index.query.Query; import org.apache.usergrid.persistence.index.query.Query.Level; @@ -118,6 +117,11 @@ import rx.functions.Func1; import static java.util.Arrays.asList; import static me.prettyprint.hector.api.factory.HFactory.createMutator; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES; import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES; import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES; @@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager { final EntityRef eref = new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); - String name; - if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) { - name = CpNamingUtils.getConnectionType( edge.getType() ); - } - else { - name = CpNamingUtils.getCollectionName( edge.getType() ); - } + String name = getNameFromEdgeType(edge.getType()); addMapSet( entityRefSetMap, eref, name ); } ).toBlocking().last(); @@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager { @Override public void call( final Edge edge ) { - EntityRef sourceEntity = - new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() ); // reindex the entity in the source entity's collection or connection index - IndexScope indexScope; - if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) { - - String collName = CpNamingUtils.getCollectionName( edge.getType() ); - indexScope = new IndexScopeImpl( - new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName )); - } - else { + IndexScope indexScope = generateScopeFromSource(edge); - String connName = CpNamingUtils.getConnectionType( edge.getType() ); - indexScope = new IndexScopeImpl( - new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ), - CpNamingUtils.getConnectionScopeName( connName ) ); - } entityIndexBatch.index( indexScope, cpEntity ); @@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager { Iterator<String> iter = str.toBlocking().getIterator(); while ( iter.hasNext() ) { String edgeType = iter.next(); - indexes.add( CpNamingUtils.getCollectionName( edgeType ) ); + indexes.add( getNameFromEdgeType( edgeType ) ); } return indexes; @@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager { final EntityIndexBatch batch = ei.createBatch(); // remove item from collection index - IndexScope indexScope = new IndexScopeImpl( - cpHeadEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName ); batch.deindex( indexScope, memberEntity ); // remove collection from item index - IndexScope itemScope = new IndexScopeImpl( - memberEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( - Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) ); + IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(), + Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ); batch.deindex( itemScope, cpHeadEntity ); - BetterFuture future = batch.execute(); + batch.execute(); // remove edge from collection to item GraphManager gm = managerCache.getGraphManager( applicationScope ); @@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager { + "' of " + headEntity.getType() + ":" + headEntity .getUuid() ); } - final IndexScope indexScope = new IndexScopeImpl( - cpHeadEntity.getId(), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName ); final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope ); @@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager { EntityIndexBatch batch = ei.createBatch(); // Index the new connection in app|source|type context - IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), - CpNamingUtils.getConnectionScopeName( connectionType ) ); + IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType ); batch.index( indexScope, targetEntity ); @@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager { final EntityIndexBatch batch = ei.createBatch(); // Deindex the connection in app|source|type context - IndexScope indexScope = new IndexScopeImpl( - new SimpleId( connectingEntityRef.getUuid(), - connectingEntityRef.getType() ), - CpNamingUtils.getConnectionScopeName( connectionType ) ); + final Id cpId = createId( connectingEntityRef ); + IndexScope indexScope = generateScopeFromConnection( cpId, connectionType ); batch.deindex( indexScope, targetEntity ); // Deindex the connection in app|source|type context @@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager { headEntity = em.validate( headEntity ); - final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), - CpNamingUtils.getConnectionScopeName( connection ) ); + final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection ); final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java index c14447d..3f2c9d6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java @@ -36,6 +36,8 @@ import rx.functions.Action1; import rx.functions.Func1; import rx.schedulers.Schedulers; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType; + /** * Takes a visitor to all collections and entities. @@ -128,7 +130,7 @@ public class CpWalker { if ( entity == null ) { return; } - String collName = CpNamingUtils.getCollectionName( edgeValue.getType() ); + String collName = getNameFromEdgeType( edgeValue.getType() ); visitor.visitCollectionEntry( em, collName, entity ); } ).subscribeOn( Schedulers.io() ); }, 100 ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java index 78c1ca7..57d69bc 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java @@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted { return; } - - - if(logger.isDebugEnabled()) { - logger.debug( - "Handling deleted event for entity {}:{} v {} " + " app: {}", - new Object[] { - entityId.getType(), entityId.getUuid(), version, - scope.getApplication() - } ); - } - - CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; - final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); - - throw new NotImplementedException( "Fix this" ); +// This is a NO-OP now, it's handled by the EntityVersionDeletedHandler + +// +// if(logger.isDebugEnabled()) { +// logger.debug( +// "Handling deleted event for entity {}:{} v {} " + " app: {}", +// new Object[] { +// entityId.getType(), entityId.getUuid(), version, +// scope.getApplication() +// } ); +// } +// +// CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; +// final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); + +// throw new NotImplementedException( "Fix this" ); //read all edges to this node and de-index them http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java index c000500..0163fc2 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java @@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated { @Override public void versionCreated( final ApplicationScope scope, final Entity entity ) { - //not op, we're not migrating properly to this. Make this an event + //not op, we're not migrating properly to this. Make this an event At the moment this is happening on write // // This check is for testing purposes and for a test that to be able to dynamically turn // // off and on delete previous versions so that it can test clean-up on read. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java index 4fa5ce1..a2e9b30 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java @@ -24,18 +24,30 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.CpEntityManagerFactory; -import org.apache.usergrid.exception.NotImplementedException; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.EntityManagerFactory; import org.apache.usergrid.persistence.collection.MvccLogEntry; import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted; +import org.apache.usergrid.persistence.collection.serialization.SerializationFig; import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexBatch; +import org.apache.usergrid.persistence.index.IndexBatchBuffer; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.model.entity.Id; import com.google.inject.Inject; import com.google.inject.Singleton; +import rx.Observable; + import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget; /** @@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { private final EntityManagerFactory emf; + private final EdgesObservable edgesObservable; + private final SerializationFig serializationFig; @Inject - public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;} + public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable, + final SerializationFig serializationFig ) { + this.emf = emf; + this.edgesObservable = edgesObservable; + this.serializationFig = serializationFig; + } @Override @@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted { } if ( logger.isDebugEnabled() ) { - logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " - + " app: {}", new Object[] { - entityVersions.size(), entityId.getType(), entityId.getUuid(), - scope.getApplication() - } ); + logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + " app: {}", new Object[] { + entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication() + } ); } CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf; final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope ); + final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope ); + + + //create an observable of all scopes to deIndex + //remove all indexes pointing to this + final Observable<IndexScope> targetScopes = edgesObservable.edgesToTarget( gm, entityId ).map( + edge -> generateScopeFromSource( edge) ); + + + //Remove all double indexes + final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map( + edge -> generateScopeToTarget( edge ) ); + + + //create a stream of scopes + final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap( + indexScope -> Observable.from( entityVersions ) + .map( version -> new IndexScopeVersion( indexScope, version ) ) ); + + //create a set of batches + final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap( + bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(), + ( EntityIndexBatch batch, IndexScopeVersion version ) -> { + //deindex in this batch + batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() ); + } ) ); - throw new NotImplementedException( "Fix this" ); + //execute the batches + batches.doOnNext( batch -> batch.execute() ).toBlocking().last(); -// final IndexScope indexScope = -// new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ), -// scope.getName() ); -// -// //create our batch, and then collect all of them into a single batch -// Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> { -// entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() ); -// } ) -// //after our batch is collected, execute it -// .doOnNext( entityIndexBatch -> { -// entityIndexBatch.execute(); -// } ).toBlocking().last(); + } + + + + + + private static final class IndexScopeVersion{ + private final IndexScope scope; + private final MvccLogEntry version; + + + private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) { + this.scope = scope; + this.version = version; + } } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java index 2eb6675..566656b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java @@ -119,8 +119,7 @@ public class FilteringLoader implements ResultsLoader { final CandidateResult currentCandidate = iter.next(); - final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( - currentCandidate.getId().getType() ); + final String collectionType = currentCandidate.getId().getType() ; final Id entityId = currentCandidate.getId(); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java index 2e9fb55..652742b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java @@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util; import java.util.UUID; +import org.apache.usergrid.persistence.EntityRef; import org.apache.usergrid.persistence.Schema; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.entities.Application; +import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.IndexScope; +import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.map.MapScope; import org.apache.usergrid.persistence.map.impl.MapScopeImpl; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import com.clearspring.analytics.util.Preconditions; + /** * Utilises for constructing standard naming conventions for collections and connections @@ -80,66 +86,150 @@ public class CpNamingUtils { * @param type * @return */ - public static String getCollectionScopeNameFromEntityType( String type ) { + private static String getCollectionScopeNameFromEntityType( String type ) { String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type ); return csn.toLowerCase(); } - public static String getCollectionScopeNameFromCollectionName( String name ) { + private static String getCollectionScopeNameFromCollectionName( String name ) { String csn = EDGE_COLL_SUFFIX + name; return csn.toLowerCase(); } - public static String getConnectionScopeName( String connectionType ) { + private static String getConnectionScopeName( String connectionType ) { String csn = EDGE_CONN_SUFFIX + connectionType ; return csn.toLowerCase(); } - public static boolean isCollectionEdgeType( String type ) { + /** + * Get the index scope for the edge from the source + * @param edge + * @return + */ + public static IndexScope generateScopeFromSource(final Edge edge ){ + + + final Id nodeId = edge.getSourceNode(); + final String scopeName = getNameFromEdgeType( edge.getType() ); + + + return new IndexScopeImpl( nodeId, scopeName ); + + } + + + + + + /** + * Get the index scope for the edge from the source + * @param edge + * @return + */ + public static IndexScope generateScopeToTarget(final Edge edge ){ + + + + final Id nodeId = edge.getTargetNode(); + final String scopeName = getNameFromEdgeType( edge.getType() ); + + + return new IndexScopeImpl( nodeId, scopeName ); + + } + + + /** + * Generate either the collection name or connection name from the edgeName + * @param edgeName + * @return + */ + public static String getNameFromEdgeType(final String edgeName){ + + + if(isCollectionEdgeType( edgeName )){ + return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) ); + } + + return getConnectionScopeName(getConnectionType( edgeName ) ); + + } + + + /** + * Get the index scope from the colleciton name + * @param nodeId The source or target node id + * @param collectionName The name of the collection. Ex "users" + * @return + */ + public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){ + return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) ); + } + + + /** + * Get the scope from the connection + * @param nodeId + * @param connectionName + * @return + */ + public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){ + return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) ); + } + + + /** + * Create an Id object from the entity ref + * @param entityRef + * @return + */ + public static Id createId(final EntityRef entityRef){ + return new SimpleId( entityRef.getUuid(), entityRef.getType() ); + } + + private static boolean isCollectionEdgeType( String type ) { return type.startsWith( EDGE_COLL_SUFFIX ); } - public static boolean isConnectionEdgeType( String type ) { + private static boolean isConnectionEdgeType( String type ) { return type.startsWith( EDGE_CONN_SUFFIX ); } - static public String getConnectionType( String edgeType ) { + + private static String getConnectionType( String edgeType ) { String[] parts = edgeType.split( "\\|" ); return parts[1]; } - static public String getCollectionName( String edgeType ) { + private static String getCollectionName( String edgeType ) { String[] parts = edgeType.split( "\\|" ); return parts[1]; } + /** + * Generate a standard edge name for our graph using the connection name + * @param connectionType The type of connection made + * @return + */ public static String getEdgeTypeFromConnectionType( String connectionType ) { - - if ( connectionType != null ) { - String csn = EDGE_CONN_SUFFIX + "|" + connectionType; - return csn; - } - - return null; + return (EDGE_CONN_SUFFIX + "|" + connectionType).toLowerCase(); } + /** + * Generate a standard edges from for a collection + * @param collectionName + * @return + */ public static String getEdgeTypeFromCollectionName( String collectionName ) { - - if ( collectionName != null ) { - String csn = EDGE_COLL_SUFFIX + "|" + collectionName; - return csn; - } - - - return null; + return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase(); } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java index 5c166c5..f743f0b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java @@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.impl.IndexScopeImpl; import org.apache.usergrid.persistence.index.query.CandidateResults; import org.apache.usergrid.persistence.index.query.Query; +import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; import com.fasterxml.uuid.UUIDComparator; @@ -60,7 +61,9 @@ import com.google.inject.Injector; import net.jcip.annotations.NotThreadSafe; import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection; import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION; +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT { EntityManager em = app.getEntityManager(); - EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class ); + EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance( + EntityIndexFactory.class ); ApplicationScope as = new ApplicationScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) ); ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as); - IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), - CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) ); + final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION); + IndexScope is = generateScopeFromCollection(rootId, collName ); Query rcq = Query.fromQL( query ); // TODO: why does this have no effect; max we ever get is 1000 entities http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java index 50c2cd9..3bfe460 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java @@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; +import org.apache.usergrid.utils.EdgeTestUtils; import com.google.inject.Injector; @@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT { final Id source = edge.getSourceNode(); //test if we're a collection, if so - if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) { - final String collectionName = CpNamingUtils.getCollectionName( edgeType ); + if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) { + final String collectionName = EdgeTestUtils.getNameForEdge( edgeType ); assertEquals("application source returned", createdApplication.getUuid(), source.getUuid()); @@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT { - if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) { + if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) { fail( "Only connection edges should be encountered" ); } - final String connectionType = CpNamingUtils.getConnectionType( edgeType ); + final String connectionType = EdgeTestUtils.getNameForEdge( edgeType ); assertEquals( "Same connection type expected", "likes", connectionType ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java index 9f1bb17..6d228b2 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java @@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.model.entity.Id; +import org.apache.usergrid.utils.EdgeTestUtils; import com.google.inject.Injector; @@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { final GraphManager gm = managerCache.getGraphManager( scope ); - edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() { - @Override - public void call( final Edge edge ) { - final String edgeType = edge.getType(); - final Id target = edge.getTargetNode(); + edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> { + final String edgeType = edge.getType(); + final Id target = edge.getTargetNode(); - //test if we're a collection, if so remove ourselves fro the types - if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) { - fail( "Connections should be the only type encountered" ); - } + //test if we're a collection, if so remove ourselves fro the types + if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) { + fail( "Connections should be the only type encountered" ); + } - final String collectionType = CpNamingUtils.getCollectionName( edgeType ); + final String collectionType = EdgeTestUtils.getNameForEdge( edgeType ); - if ( collectionType.equals( type1 ) ) { - assertTrue( "Element should be present on removal", type1Identities.remove( target ) ); - } - else if ( collectionType.equals( type2 ) ) { - assertTrue( "Element should be present on removal", type2Identities.remove( target ) ); - } + if ( collectionType.equals( type1 ) ) { + assertTrue( "Element should be present on removal", type1Identities.remove( target ) ); + } + else if ( collectionType.equals( type2 ) ) { + assertTrue( "Element should be present on removal", type2Identities.remove( target ) ); + } - } } ).toBlocking().lastOrDefault( null ); @@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT { //test connections - edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() { - @Override - public void call( final Edge edge ) { - final String edgeType = edge.getType(); - final Id target = edge.getTargetNode(); + edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> { + final String edgeType = edge.getType(); + final Id target = edge.getTargetNode(); - if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) { - fail( "Only connection edges should be encountered" ); - } + if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) { + fail( "Only connection edges should be encountered" ); + } - final String connectionType = CpNamingUtils.getConnectionType( edgeType ); + final String connectionType = EdgeTestUtils.getNameForEdge( edgeType ); - assertEquals( "Same connection type expected", "likes", connectionType ); + assertEquals( "Same connection type expected", "likes", connectionType ); - assertTrue( "Element should be present on removal", connections.remove( target ) ); - } + assertTrue( "Element should be present on removal", connections.remove( target ) ); } ).toBlocking().lastOrDefault( null ); assertEquals( "Every connection should have been encountered", 0, connections.size() ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java new file mode 100644 index 0000000..f217338 --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.utils; + + +import org.apache.usergrid.corepersistence.util.CpNamingUtils; + +import static org.junit.Assert.assertEquals; + + +public class EdgeTestUtils { + + /** + * Get the name for an edge + */ + public static String getNameForEdge( final String edgeName ) { + final String[] parts = edgeName.split( "\\|" ); + + assertEquals( "there should be 2 parts", parts.length, 2 ); + + return parts[1]; + } + + + public static boolean isCollectionEdgeType( String type ) { + return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX ); + } + + + public static boolean isConnectionEdgeType( String type ) { + return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java index 2bd1edb..84a7fc3 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java @@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> try { + subscriber.onStart(); + //get our iterator and push data to the observer final Iterator<T> itr = getIterator();
