Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 50f8a56fd -> ed317b295


[USERGRID-608] Added initial version of delete flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ed317b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ed317b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ed317b29

Branch: refs/heads/USERGRID-608
Commit: ed317b295d96c150693d441ba7d306d0ef3ff40b
Parents: 50f8a56
Author: GERey <[email protected]>
Authored: Fri May 15 12:23:28 2015 -0700
Committer: GERey <[email protected]>
Committed: Fri May 15 12:23:28 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 84 ++++++++++++++------
 .../corepersistence/CpEntityManagerFactory.java |  6 +-
 .../asyncevents/EventBuilderImpl.java           | 13 ++-
 .../corepersistence/index/IndexService.java     |  2 +-
 .../corepersistence/index/IndexServiceImpl.java | 13 ++-
 .../index/ApplicationEntityIndex.java           | 21 ++++-
 .../impl/EsApplicationEntityIndexImpl.java      | 10 +++
 7 files changed, 112 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 9430c4e..407ae94 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -65,6 +65,8 @@ import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
 import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import 
org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.map.MapManager;
@@ -167,6 +169,8 @@ public class CpEntityManager implements EntityManager {
 
     private PipelineBuilderFactory pipelineBuilderFactory;
 
+    private final GraphManagerFactory graphManagerFactory;
+
     private boolean skipAggregateCounters;
     private MetricsFactory metricsFactory;
     private Timer aggCounterTimer;
@@ -207,7 +211,8 @@ public class CpEntityManager implements EntityManager {
      */
     public CpEntityManager( final CassandraService cass, final CounterUtils 
counterUtils, final AsyncEventService indexService, final ManagerCache 
managerCache,
                             final MetricsFactory metricsFactory, final 
EntityManagerFig entityManagerFig,
-                            final PipelineBuilderFactory 
pipelineBuilderFactory , final UUID applicationId ) {
+                            final PipelineBuilderFactory 
pipelineBuilderFactory ,
+                            final GraphManagerFactory 
graphManagerFactory,final UUID applicationId ) {
         this.entityManagerFig = entityManagerFig;
 
 
@@ -217,7 +222,9 @@ public class CpEntityManager implements EntityManager {
         Preconditions.checkNotNull( applicationId, "applicationId must not be 
null" );
         Preconditions.checkNotNull( indexService, "indexService must not be 
null" );
         Preconditions.checkNotNull( pipelineBuilderFactory, 
"pipelineBuilderFactory must not be null" );
+        Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory 
must not be null" );
         this.pipelineBuilderFactory = pipelineBuilderFactory;
+        this.graphManagerFactory = graphManagerFactory;
 
 
         this.managerCache = managerCache;
@@ -407,10 +414,10 @@ public class CpEntityManager implements EntityManager {
         if(entity == null) {
             return null;
         }
-        Class clazz = 
Schema.getDefaultSchema().getEntityClass(entity.getId().getType());
+        Class clazz = Schema.getDefaultSchema().getEntityClass( 
entity.getId().getType() );
 
-        Entity oldFormatEntity = 
EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), 
clazz);
-        oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity));
+        Entity oldFormatEntity = EntityFactory.newEntity( 
entity.getId().getUuid(), entity.getId().getType(), clazz );
+        oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) );
 
         return oldFormatEntity;
     }
@@ -615,47 +622,72 @@ public class CpEntityManager implements EntityManager {
     }
 
 
+    /**
+     * There are a series of steps that are kicked off by a delete
+     * 1. Mark the entity in the entity collection manager as deleted
+     * 2. Mark entity as deleted in the graph
+     * 3. Kick off async process
+     * 4. Delete all entity documents out of elasticsearch.
+     * 5. Compact Graph so that it deletes the marked values.
+     * 6. Delete entity from cassandra using the map manager.
+     *
+     * @param entityRef an entity reference
+     *
+     * @throws Exception
+     */
     @Override
     public void delete( EntityRef entityRef ) throws Exception {
-        deleteAsync( entityRef ).toBlocking().lastOrDefault( null );
-        //delete from our UUID index
-        MapManager mm = getMapManagerForTypes();
-        mm.delete( entityRef.getUuid().toString() );
+        //TODO: since we want the user to mark it and we sweep it later. It 
should be marked by the graph manager here.
+        //Step 1 & 2 Currently to block so we ensure that marking is done 
immediately
+        //If this returns null then nothing was marked null so the entity 
doesn't exist
+        markEntity( entityRef ).toBlocking().lastOrDefault( null );
 
-    }
+        //TODO: figure out how to return async call to service tier? Do I not 
need to?
+        //Step 3
+        deleteAsync( entityRef );
 
+    }
 
-    private Observable deleteAsync( EntityRef entityRef ) throws Exception {
 
+    /**
+     * Marks entity for deletion in entity collection manager and graph.
+     * Convert this method to return a list of observables that we can crunch 
through on return.
+     * Returns merged obversable that will mark the edges in the ecm and the 
graph manager.
+     * @param entityRef
+     * @return
+     */
+    private Observable markEntity(EntityRef entityRef){
         if(applicationScope == null || entityRef == null){
             return Observable.empty();
         }
+        GraphManager gm =  graphManagerFactory.createEdgeManager( 
applicationScope );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( 
applicationScope );
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-        //        if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
-        //            throw new IllegalArgumentException(
-        //                "Entity Id " + entityId.getType() + ":"+ 
entityId.getUuid() +" uuid not time based");
-        //        }
+        //Step 1 & 2 of delete
+        return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, 
entityRef.getUuid().timestamp() ) );
 
-        org.apache.usergrid.persistence.model.entity.Entity entity =
-                load(entityId);
+    }
 
-        if ( entity != null ) {
+    /**
+    * 4. Delete all entity documents out of elasticsearch.
+    * 5. Compact Graph so that it deletes the marked values.
+    * 6. Delete entity from cassandra using the map manager.
+     **/
+    private void deleteAsync( EntityRef entityRef ) throws Exception {
 
-            decrementEntityCollection( Schema.defaultCollectionName( 
entityId.getType() ) );
-            // and finally...
+        Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
-            //delete it asynchronously
-            indexService.queueEntityDelete( applicationScope, entityId );
+        //Step 4 && 5
+        indexService.queueEntityDelete( applicationScope, entityId );
+
+        //Step 6
+        //delete from our UUID index
+        MapManager mm = getMapManagerForTypes();
+        mm.delete( entityRef.getUuid().toString() );
 
-            return ecm.mark( entityId );
-        }
-        else {
-            return Observable.empty();
-        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e796545..918d3d4 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -62,6 +62,7 @@ import 
org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import 
org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
@@ -126,6 +127,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
     private final MetricsFactory metricsFactory;
     private final AsyncEventService indexService;
     private final PipelineBuilderFactory pipelineBuilderFactory;
+    private final GraphManagerFactory graphManagerFactory;
 
     public CpEntityManagerFactory( final CassandraService cassandraService, 
final CounterUtils counterUtils,
                                    final Injector injector ) {
@@ -140,6 +142,7 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
         this.pipelineBuilderFactory = injector.getInstance( 
PipelineBuilderFactory.class );
+        this.graphManagerFactory = injector.getInstance( 
GraphManagerFactory.class );
         this.applicationIdCache = 
injector.getInstance(ApplicationIdCacheFactory.class).getInstance(
             getManagementEntityManager() );
 
@@ -198,7 +201,8 @@ public class CpEntityManagerFactory implements 
EntityManagerFactory, Application
 
 
     private EntityManager _getEntityManager( UUID applicationId ) {
-        EntityManager em = new CpEntityManager(cassandraService, counterUtils, 
indexService, managerCache, metricsFactory, entityManagerFig, 
pipelineBuilderFactory,  applicationId );
+        EntityManager em = new CpEntityManager(cassandraService, counterUtils, 
indexService, managerCache,
+            metricsFactory, entityManagerFig, pipelineBuilderFactory, 
graphManagerFactory, applicationId );
         return em;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index d678a18..4e83e0b 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -115,6 +115,9 @@ public class EventBuilderImpl implements EventBuilder {
     }
 
 
+    //Does the queue entityDelete mark the entity then immediately does to the 
deleteEntityIndex. seems like
+    //it'll need to be pushed up higher so we can do the marking that isn't 
async or does it not matter?
+
     @Override
     public EntityDeleteResults queueEntityDelete( final ApplicationScope 
applicationScope, final Id entityId ) {
         log.debug( "Deleting entity id from index in app scope {} with 
entityId {} }", applicationScope, entityId );
@@ -125,19 +128,25 @@ public class EventBuilderImpl implements EventBuilder {
 
         //needs get versions here.
 
+
         //TODO: change this to be an observable
         //so we get these versions and loop through them until we find the 
MvccLogEntry that is marked as delete.
         //TODO: evauluate this to possibly be an observable to pass to the 
nextmethod.
         MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId 
).toBlocking()
-                                             .first( mvccLogEntry -> 
mvccLogEntry.getState()== MvccLogEntry.State.DELETED );
+                                             .firstOrDefault( null,
+                                                 mvccLogEntry -> 
mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
 
+        //If there is nothing marked then we shouldn't return any results.
+        //TODO: evaluate if we want to return null or return empty observable 
when we don't have any results marked as deleted.
+        if(mostRecentlyMarked == null)
+            return null;
 
         //observable of index operation messages
         //this method will need the most recent version.
         //When we go to compact the graph make sure you turn on the debugging 
mode for the deleted nodes so
         //we can verify that we mark them. That said that part seems kinda 
done. as we also delete the mvcc buffers.
         final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteEntityIndexes( applicationScope, 
entityId,mostRecentlyMarked.getVersion() );
+            indexService.deleteEntityIndexes( applicationScope, entityId, 
mostRecentlyMarked.getVersion() );
 
 
         //observable of entries as the batches are deleted

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 47601fb..54eb464 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -81,7 +81,7 @@ public interface IndexService {
      * @return
      */
     Observable<IndexOperationMessage> deleteEntityIndexes(final 
ApplicationScope applicationScope, final Id entityId,
-                                                         final UUID version);
+                                                         final UUID 
markedVersion);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8700c9b..7d7d0d9 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -218,7 +218,7 @@ public class IndexServiceImpl implements IndexService {
 
     @Override
     public Observable<IndexOperationMessage> deleteEntityIndexes( final 
ApplicationScope applicationScope,
-                                                                  final Id 
entityId, final UUID version ) {
+                                                                  final Id 
entityId, final UUID markedVersion ) {
 
         //bootstrap the lower modules from their caches
         final GraphManager gm = graphManagerFactory.createEdgeManager( 
applicationScope );
@@ -238,7 +238,10 @@ public class IndexServiceImpl implements IndexService {
         final Observable<IndexEdge> observable = Observable.merge( 
sourceEdgesToIndex, targetSizes);
         //do our observable for batching
         //try to send a whole batch if we can
-        version.
+
+
+
+        //loop through candidateResults and deindex every single result that 
comeback.
 
         //do our observable for batching
         //try to send a whole batch if we can
@@ -249,7 +252,11 @@ public class IndexServiceImpl implements IndexService {
                 //collect results into a single batch
                 .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> {
                     //logger.debug( "adding edge {} to batch for entity {}", 
indexEdge, entity );
-                    batch.deindex( indexEdge, entityId, version );
+                    //TODO: refactor into stages of observable, also need a 
loop to get entities until we recieve nothing back.
+                    CandidateResults crs = ei.getAllEntityVersionBeforeMark( 
entityId, markedVersion, 1000, 0 );
+                    for(CandidateResult cr: crs){
+                        batch.deindex( indexEdge, cr);
+                    }
                 } )
                     //return the future from the batch execution
                 .flatMap( batch -> batch.execute() ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 20c3f12..0e0e033 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.index;
 
 
+import java.util.UUID;
+
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -49,15 +51,26 @@ public interface ApplicationEntityIndex {
 
 
     /**
-     * Same as search, just iterates all documents that match the index edge 
exactly
-     * @param edge
-     * @param limit
-     * @param offset
+     * Same as search, just iterates all documents that match the index edge 
exactly.
+     * @param edge The edge to search on
+     * @param entityId The entity that the searchEdge is connected to.
+     * @param limit The limit of the values to return per search.
+     * @param offset The offset to page the query on.
      * @return
      */
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id 
entityId,  final int limit, final int offset);
 
     /**
+     * Returns all entity documents that match the entityId and come before 
the marked version
+     * @param entityId The entityId to match when searching
+     * @param markedVersion The version that has been marked for deletion. All 
version before this one must be deleted.
+     * @param limit The limit of the values to return per search.
+     * @param offset The offset to page the query on.
+     * @return
+     */
+    CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final 
UUID markedVersion ,final int limit, final int offset);
+
+    /**
      * delete all application records
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2256769..2709569 100644
--- 
a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ 
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ListenableActionFuture;
@@ -40,6 +41,8 @@ import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.NotImplementedException;
+
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -221,6 +224,13 @@ public class EsApplicationEntityIndexImpl implements 
ApplicationEntityIndex {
     }
 
 
+    @Override
+    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, 
final UUID markedVersion, final int limit,
+                                                           final int offset ) {
+        throw new NotImplementedException( "Implement me or else I won't 
work." );
+    }
+
+
     /**
      * Completely delete an index.
      */

Reply via email to