Refactored index scope generation to be more consistent and clean

Moved some newly private methods to test utils

Added onStart event to the observable iterator


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fbce62df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fbce62df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fbce62df

Branch: refs/heads/USERGRID-509
Commit: fbce62df84323ea19dce8a00fb6e52aa245e7bb5
Parents: cf3f7ab
Author: Todd Nine <[email protected]>
Authored: Mon Mar 30 09:45:46 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Mon Mar 30 12:03:10 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  29 +---
 .../corepersistence/CpRelationManager.java      |  61 +++------
 .../usergrid/corepersistence/CpWalker.java      |   4 +-
 .../events/EntityDeletedHandler.java            |  31 ++---
 .../events/EntityVersionCreatedHandler.java     |   2 +-
 .../events/EntityVersionDeletedHandler.java     |  87 +++++++++---
 .../results/FilteringLoader.java                |   3 +-
 .../corepersistence/util/CpNamingUtils.java     | 134 ++++++++++++++++---
 .../corepersistence/StaleIndexCleanupTest.java  |  10 +-
 .../rx/EdgesFromSourceObservableIT.java         |   9 +-
 .../rx/EdgesToTargetObservableIT.java           |  53 ++++----
 .../apache/usergrid/utils/EdgeTestUtils.java    |  50 +++++++
 .../persistence/core/rx/ObservableIterator.java |   2 +
 13 files changed, 307 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index c45c390..a7dda13 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -69,7 +69,6 @@ import 
org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif
 import 
org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.Event;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,10 +79,8 @@ import 
org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
 
 import me.prettyprint.hector.api.Keyspace;
 import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -129,7 +125,7 @@ import static 
me.prettyprint.hector.api.factory.HFactory.createMutator;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
 import static 
org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
 import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
-
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException 
) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-
-            throw hre;
-        }
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( 
entity );
@@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager {
     @Override
     public void deleteProperty( EntityRef entityRef, String propertyName ) 
throws Exception {
 
-        IndexScope defaultIndexScope = new IndexScopeImpl( 
getApplicationScope().getApplication(),
-                getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-
-
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager {
         catch ( WriteUniqueVerifyException wuve ) {
             handleWriteUniqueVerifyException( entity, wuve );
         }
-        catch ( HystrixRuntimeException hre ) {
 
-            if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
-                WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException 
) hre.getCause();
-                handleWriteUniqueVerifyException( entity, wuve );
-            }
-        }
 
         // Index CP entity into default collection scope
         //        IndexScope defaultIndexScope = new IndexScopeImpl(
@@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager {
         final EntityIndexBatch batch = aie.createBatch();
 
         // index member into entity collection | type scope
-        IndexScope collectionIndexScope = new IndexScopeImpl( 
collectionEntity.getId(),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( 
collName ) );
-
+        IndexScope collectionIndexScope = generateScopeFromCollection( 
collectionEntity.getId(), collName );
         batch.index( collectionIndexScope, memberEntity );
 
         //TODO REMOVE INDEX CODE

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3427684..b76f38f 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -79,7 +79,6 @@ import 
org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -118,6 +117,11 @@ import rx.functions.Func1;
 import static java.util.Arrays.asList;
 
 import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
 import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
 import static 
org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
 import static 
org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager {
                 final EntityRef eref =
                     new SimpleEntityRef( edge.getSourceNode().getType(), 
edge.getSourceNode().getUuid() );
 
-                String name;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
-                    name = CpNamingUtils.getConnectionType( edge.getType() );
-                }
-                else {
-                    name = CpNamingUtils.getCollectionName( edge.getType() );
-                }
+                String name = getNameFromEdgeType(edge.getType());
                 addMapSet( entityRefSetMap, eref, name );
             }
          ).toBlocking().last();
@@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager 
{
                     @Override
                     public void call( final Edge edge ) {
 
-                        EntityRef sourceEntity =
-                                new SimpleEntityRef( 
edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
                         // reindex the entity in the source entity's 
collection or connection index
 
-                        IndexScope indexScope;
-                        if ( CpNamingUtils.isCollectionEdgeType( 
edge.getType() ) ) {
-
-                            String collName = CpNamingUtils.getCollectionName( 
edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), 
sourceEntity.getType()),
-                                
CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
-                        }
-                        else {
+                        IndexScope indexScope = generateScopeFromSource(edge);
 
-                            String connName = CpNamingUtils.getConnectionType( 
edge.getType() );
-                            indexScope = new IndexScopeImpl(
-                                new SimpleId( sourceEntity.getUuid(), 
sourceEntity.getType() ),
-                                CpNamingUtils.getConnectionScopeName( connName 
) );
-                        }
 
                         entityIndexBatch.index( indexScope, cpEntity );
 
@@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager {
         Iterator<String> iter = str.toBlocking().getIterator();
         while ( iter.hasNext() ) {
             String edgeType = iter.next();
-            indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
+            indexes.add( getNameFromEdgeType( edgeType ) );
         }
 
         return indexes;
@@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager 
{
         final EntityIndexBatch batch = ei.createBatch();
 
         // remove item from collection index
-        IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) 
);
+        IndexScope indexScope = generateScopeFromCollection( 
cpHeadEntity.getId(), collName );
 
         batch.deindex( indexScope, memberEntity );
 
         // remove collection from item index
-        IndexScope itemScope = new IndexScopeImpl(
-            memberEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName(
-                    Schema.defaultCollectionName( 
cpHeadEntity.getId().getType() ) ) );
+        IndexScope itemScope = generateScopeFromCollection( 
memberEntity.getId(),
+            Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
 
 
         batch.deindex( itemScope, cpHeadEntity );
 
-        BetterFuture future = batch.execute();
+        batch.execute();
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager {
                     + "' of " + headEntity.getType() + ":" + headEntity 
.getUuid() );
         }
 
-        final IndexScope indexScope = new IndexScopeImpl(
-            cpHeadEntity.getId(),
-            CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) 
);
+        final IndexScope indexScope = generateScopeFromCollection( 
cpHeadEntity.getId(), collName );
 
         final ApplicationEntityIndex ei = managerCache.getEntityIndex( 
applicationScope );
 
@@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager {
         EntityIndexBatch batch = ei.createBatch();
 
         // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        IndexScope indexScope = generateScopeFromConnection( 
cpHeadEntity.getId(), connectionType );
 
         batch.index( indexScope, targetEntity );
 
@@ -1208,10 +1184,8 @@ public class CpRelationManager implements 
RelationManager {
         final EntityIndexBatch batch = ei.createBatch();
 
         // Deindex the connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-            new SimpleId( connectingEntityRef.getUuid(),
-                connectingEntityRef.getType() ),
-                CpNamingUtils.getConnectionScopeName( connectionType ) );
+        final Id cpId =  createId( connectingEntityRef );
+        IndexScope indexScope = generateScopeFromConnection( cpId, 
connectionType );
         batch.deindex( indexScope, targetEntity );
 
         // Deindex the connection in app|source|type context
@@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager 
{
 
         headEntity = em.validate( headEntity );
 
-        final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( connection ) );
+        final IndexScope indexScope = generateScopeFromConnection( 
cpHeadEntity.getId(), connection );
 
         final SearchTypes searchTypes = SearchTypes.fromNullableTypes( 
query.getEntityType() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index c14447d..3f2c9d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -36,6 +36,8 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+
 
 /**
  * Takes a visitor to all collections and entities.
@@ -128,7 +130,7 @@ public class CpWalker {
                 if ( entity == null ) {
                     return;
                 }
-                String collName = CpNamingUtils.getCollectionName( 
edgeValue.getType() );
+                String collName = getNameFromEdgeType( edgeValue.getType() );
                 visitor.visitCollectionEntry( em, collName, entity );
             } ).subscribeOn( Schedulers.io() );
         }, 100 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 78c1ca7..57d69bc 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted {
             return;
         }
 
-
-
-        if(logger.isDebugEnabled()) {
-            logger.debug(
-                "Handling deleted event for entity {}:{} v {} " + " app: {}",
-                new Object[] {
-                    entityId.getType(), entityId.getUuid(), version,
-                    scope.getApplication()
-                } );
-        }
-
-        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
-        final ApplicationEntityIndex ei = 
cpemf.getManagerCache().getEntityIndex( scope );
-
-        throw new NotImplementedException( "Fix this" );
+//        This is a NO-OP now, it's handled by the EntityVersionDeletedHandler
+
+//
+//        if(logger.isDebugEnabled()) {
+//            logger.debug(
+//                "Handling deleted event for entity {}:{} v {} " + " app: {}",
+//                new Object[] {
+//                    entityId.getType(), entityId.getUuid(), version,
+//                    scope.getApplication()
+//                } );
+//        }
+//
+//        CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+//        final ApplicationEntityIndex ei = 
cpemf.getManagerCache().getEntityIndex( scope );
+
+//        throw new NotImplementedException( "Fix this" );
 
         //read all edges to this node and de-index them
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index c000500..0163fc2 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements 
EntityVersionCreated {
 
     @Override
     public void versionCreated( final ApplicationScope scope, final Entity 
entity ) {
-        //not op, we're not migrating properly to this.  Make this an event
+        //not op, we're not migrating properly to this.  Make this an event At 
the moment this is happening on write
 
 //        // This check is for testing purposes and for a test that to be able 
to dynamically turn
 //        // off and on delete previous versions so that it can test clean-up 
on read.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 4fa5ce1..a2e9b30 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,18 +24,30 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import 
org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import rx.Observable;
+
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
 
 
 /**
@@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements 
EntityVersionDeleted {
 
 
     private final EntityManagerFactory emf;
+    private final EdgesObservable edgesObservable;
+    private final SerializationFig serializationFig;
 
 
     @Inject
-    public EntityVersionDeletedHandler( final EntityManagerFactory emf ) 
{this.emf = emf;}
+    public EntityVersionDeletedHandler( final EntityManagerFactory emf, final 
EdgesObservable edgesObservable,
+                                        final SerializationFig 
serializationFig ) {
+        this.emf = emf;
+        this.edgesObservable = edgesObservable;
+        this.serializationFig = serializationFig;
+    }
 
 
     @Override
@@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements 
EntityVersionDeleted {
         }
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Handling versionDeleted count={} event for entity 
{}:{} v {} "
-                    + "  app: {}", new Object[] {
-                    entityVersions.size(), entityId.getType(), 
entityId.getUuid(),
-                    scope.getApplication()
-                } );
+            logger.debug( "Handling versionDeleted count={} event for entity 
{}:{} v {} " + "  app: {}", new Object[] {
+                entityVersions.size(), entityId.getType(), entityId.getUuid(), 
scope.getApplication()
+            } );
         }
 
         CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
 
         final ApplicationEntityIndex ei = 
cpemf.getManagerCache().getEntityIndex( scope );
+        final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope 
);
+
+
+        //create an observable of all scopes to deIndex
+        //remove all indexes pointing to this
+        final Observable<IndexScope> targetScopes =  
edgesObservable.edgesToTarget( gm, entityId ).map(
+            edge -> generateScopeFromSource( edge) );
+
+
+        //Remove all double indexes
+        final Observable<IndexScope> sourceScopes = 
edgesObservable.edgesFromSource( gm, entityId ).map(
+                    edge -> generateScopeToTarget( edge ) );
+
+
+        //create a stream of scopes
+        final Observable<IndexScopeVersion> versions = Observable.merge( 
targetScopes, sourceScopes ).flatMap(
+            indexScope -> Observable.from( entityVersions )
+                                    .map( version -> new IndexScopeVersion( 
indexScope, version ) ) );
+
+        //create a set of batches
+        final Observable<EntityIndexBatch> batches = versions.buffer( 
serializationFig.getBufferSize() ).flatMap(
+            bufferedVersions -> Observable.from( bufferedVersions ).collect( 
() -> ei.createBatch(),
+                ( EntityIndexBatch batch, IndexScopeVersion version ) -> {
+                    //deindex in this batch
+                    batch.deindex( version.scope, 
version.version.getEntityId(), version.version.getVersion() );
+                } ) );
 
 
-        throw new NotImplementedException( "Fix this" );
 
+        //execute the batches
+        batches.doOnNext( batch -> batch.execute() ).toBlocking().last();
 
-//        final IndexScope indexScope =
-//            new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), 
scope.getOwner().getType() ),
-//                scope.getName() );
-//
-//        //create our batch, and then collect all of them into a single batch
-//        Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( 
entityIndexBatch, mvccLogEntry ) -> {
-//            entityIndexBatch.deindex( indexScope, 
mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-//        } )
-//            //after our batch is collected, execute it
-//            .doOnNext( entityIndexBatch -> {
-//                entityIndexBatch.execute();
-//            } ).toBlocking().last();
+    }
+
+
+
+
+
+    private static final class IndexScopeVersion{
+        private final IndexScope scope;
+        private final MvccLogEntry version;
+
+
+        private IndexScopeVersion( final IndexScope scope, final MvccLogEntry 
version ) {
+            this.scope = scope;
+            this.version = version;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 2eb6675..566656b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -119,8 +119,7 @@ public class FilteringLoader implements ResultsLoader {
 
             final CandidateResult currentCandidate = iter.next();
 
-            final String collectionType = 
CpNamingUtils.getCollectionScopeNameFromEntityType(
-                    currentCandidate.getId().getType() );
+            final String collectionType = currentCandidate.getId().getType() ;
 
             final Id entityId = currentCandidate.getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 2e9fb55..652742b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util;
 
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
+import com.clearspring.analytics.util.Preconditions;
+
 
 /**
  * Utilises for constructing standard naming conventions for collections and 
connections
@@ -80,66 +86,150 @@ public class CpNamingUtils {
      * @param type
      * @return
      */
-    public static String getCollectionScopeNameFromEntityType( String type ) {
+    private static String getCollectionScopeNameFromEntityType( String type ) {
         String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
         return csn.toLowerCase();
     }
 
 
-    public static String getCollectionScopeNameFromCollectionName( String name 
) {
+    private static String getCollectionScopeNameFromCollectionName( String 
name ) {
         String csn = EDGE_COLL_SUFFIX + name;
         return csn.toLowerCase();
     }
 
 
-    public static String getConnectionScopeName( String connectionType ) {
+    private static String getConnectionScopeName( String connectionType ) {
         String csn = EDGE_CONN_SUFFIX + connectionType ;
         return csn.toLowerCase();
     }
 
 
-    public static boolean isCollectionEdgeType( String type ) {
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeFromSource(final Edge edge ){
+
+
+        final Id nodeId = edge.getSourceNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+
+
+
+    /**
+     * Get the index scope for the edge from the source
+     * @param edge
+     * @return
+     */
+    public static IndexScope generateScopeToTarget(final Edge edge ){
+
+
+
+        final Id nodeId = edge.getTargetNode();
+        final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+        return new IndexScopeImpl( nodeId, scopeName );
+
+    }
+
+
+    /**
+     * Generate either the collection name or connection name from the edgeName
+     * @param edgeName
+     * @return
+     */
+    public static String getNameFromEdgeType(final String edgeName){
+
+
+        if(isCollectionEdgeType( edgeName )){
+           return 
getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
+        }
+
+        return getConnectionScopeName(getConnectionType( edgeName )  );
+
+    }
+
+
+    /**
+     * Get the index scope from the colleciton name
+     * @param nodeId The source or target node id
+     * @param collectionName The name of the collection.  Ex "users"
+     * @return
+     */
+    public static IndexScope generateScopeFromCollection( final Id nodeId, 
final String collectionName ){
+        return new IndexScopeImpl( nodeId, 
getCollectionScopeNameFromCollectionName( collectionName ) );
+    }
+
+
+    /**
+     * Get the scope from the connection
+     * @param nodeId
+     * @param connectionName
+     * @return
+     */
+    public static IndexScope generateScopeFromConnection( final Id nodeId, 
final String connectionName ){
+        return new IndexScopeImpl( nodeId, getConnectionScopeName( 
connectionName ) );
+    }
+
+
+    /**
+     * Create an Id object from the entity ref
+     * @param entityRef
+     * @return
+     */
+    public static Id createId(final EntityRef entityRef){
+      return new SimpleId( entityRef.getUuid(), entityRef.getType() );
+    }
+
+    private static boolean isCollectionEdgeType( String type ) {
         return type.startsWith( EDGE_COLL_SUFFIX );
     }
 
 
-    public static boolean isConnectionEdgeType( String type ) {
+    private static boolean isConnectionEdgeType( String type ) {
         return type.startsWith( EDGE_CONN_SUFFIX );
     }
 
 
-    static public String  getConnectionType( String edgeType ) {
+
+    private static  String  getConnectionType( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
-    static public String getCollectionName( String edgeType ) {
+    private static String getCollectionName( String edgeType ) {
         String[] parts = edgeType.split( "\\|" );
         return parts[1];
     }
 
 
+    /**
+     * Generate a standard edge name for our graph using the connection name
+     * @param connectionType The type of connection made
+     * @return
+     */
     public static String getEdgeTypeFromConnectionType( String connectionType 
) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
+        return  (EDGE_CONN_SUFFIX  + "|" + connectionType).toLowerCase();
     }
 
 
+    /**
+     * Generate a standard edges from for a collection
+     * @param collectionName
+     * @return
+     */
     public static String getEdgeTypeFromCollectionName( String collectionName 
) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
+        return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5c166c5..f743f0b 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
 import com.fasterxml.uuid.UUIDComparator;
@@ -60,7 +61,9 @@ import com.google.inject.Injector;
 import net.jcip.annotations.NotThreadSafe;
 
 import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static 
org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT 
{
 
         EntityManager em = app.getEntityManager();
 
-        EntityIndexFactory eif =  SpringResource.getInstance().getBean( 
Injector.class ).getInstance( EntityIndexFactory.class );
+        EntityIndexFactory eif =  SpringResource.getInstance().getBean( 
Injector.class ).getInstance(
+            EntityIndexFactory.class );
 
         ApplicationScope as = new ApplicationScopeImpl(
             new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) );
         ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as);
 
-        IndexScope is = new IndexScopeImpl( new SimpleId( 
em.getApplicationId(), TYPE_APPLICATION ),
-                CpNamingUtils.getCollectionScopeNameFromCollectionName( 
collName ) );
+        final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION);
+        IndexScope is = generateScopeFromCollection(rootId, collName );
         Query rcq = Query.fromQL( query );
 
         // TODO: why does this have no effect; max we ever get is 1000 entities

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 50c2cd9..3bfe460 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends 
AbstractCoreIT {
                 final Id source = edge.getSourceNode();
 
                 //test if we're a collection, if so
-                if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    final String collectionName = 
CpNamingUtils.getCollectionName( edgeType );
+                if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                    final String collectionName = 
EdgeTestUtils.getNameForEdge( edgeType );
 
                     assertEquals("application source returned", 
createdApplication.getUuid(), source.getUuid());
 
@@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends 
AbstractCoreIT {
 
 
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
+                if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
                     fail( "Only connection edges should be encountered" );
                 }
 
-                final String connectionType = CpNamingUtils.getConnectionType( 
edgeType );
+                final String connectionType = EdgeTestUtils.getNameForEdge( 
edgeType );
 
                 assertEquals( "Same connection type expected", "likes", 
connectionType );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9f1bb17..6d228b2 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -38,6 +38,7 @@ import 
org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.EdgeTestUtils;
 
 import com.google.inject.Injector;
 
@@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends 
AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSource( gm, applicationId 
).doOnNext( new Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, applicationId 
).doOnNext( edge -> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                //test if we're a collection, if so remove ourselves fro the 
types
-                if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
-                    fail( "Connections should be the only type encountered" );
-                }
+            //test if we're a collection, if so remove ourselves fro the types
+            if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+                fail( "Connections should be the only type encountered" );
+            }
 
 
-                final String collectionType = CpNamingUtils.getCollectionName( 
edgeType );
+            final String collectionType = EdgeTestUtils.getNameForEdge( 
edgeType );
 
-                if ( collectionType.equals( type1 ) ) {
-                    assertTrue( "Element should be present on removal", 
type1Identities.remove( target ) );
-                }
-                else if ( collectionType.equals( type2 ) ) {
-                    assertTrue( "Element should be present on removal", 
type2Identities.remove( target ) );
-                }
+            if ( collectionType.equals( type1 ) ) {
+                assertTrue( "Element should be present on removal", 
type1Identities.remove( target ) );
+            }
+            else if ( collectionType.equals( type2 ) ) {
+                assertTrue( "Element should be present on removal", 
type2Identities.remove( target ) );
+            }
 
 
-            }
         } ).toBlocking().lastOrDefault( null );
 
 
@@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends 
AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new 
Action1<Edge>() {
-            @Override
-            public void call( final Edge edge ) {
-                final String edgeType = edge.getType();
-                final Id target = edge.getTargetNode();
+        edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge 
-> {
+            final String edgeType = edge.getType();
+            final Id target = edge.getTargetNode();
 
-                if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
-                    fail( "Only connection edges should be encountered" );
-                }
+            if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
+                fail( "Only connection edges should be encountered" );
+            }
 
-                final String connectionType = CpNamingUtils.getConnectionType( 
edgeType );
+            final String connectionType = EdgeTestUtils.getNameForEdge( 
edgeType );
 
-                assertEquals( "Same connection type expected", "likes", 
connectionType );
+            assertEquals( "Same connection type expected", "likes", 
connectionType );
 
 
-                assertTrue( "Element should be present on removal", 
connections.remove( target ) );
-            }
+            assertTrue( "Element should be present on removal", 
connections.remove( target ) );
         } ).toBlocking().lastOrDefault( null );
 
         assertEquals( "Every connection should have been encountered", 0, 
connections.size() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java 
b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
new file mode 100644
index 0000000..f217338
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.utils;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EdgeTestUtils {
+
+    /**
+     * Get the name for an edge
+     */
+    public static String getNameForEdge( final String edgeName ) {
+        final String[] parts = edgeName.split( "\\|" );
+
+        assertEquals( "there should be 2 parts", parts.length, 2 );
+
+        return parts[1];
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX );
+    }
+
+
+    public static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 2bd1edb..84a7fc3 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements 
Observable.OnSubscribe<T>
 
 
         try {
+            subscriber.onStart();
+
             //get our iterator and push data to the observer
             final Iterator<T> itr = getIterator();
 

Reply via email to