Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-608 50f8a56fd -> ed317b295
[USERGRID-608] Added initial version of delete flow Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ed317b29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ed317b29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ed317b29 Branch: refs/heads/USERGRID-608 Commit: ed317b295d96c150693d441ba7d306d0ef3ff40b Parents: 50f8a56 Author: GERey <[email protected]> Authored: Fri May 15 12:23:28 2015 -0700 Committer: GERey <[email protected]> Committed: Fri May 15 12:23:28 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 84 ++++++++++++++------ .../corepersistence/CpEntityManagerFactory.java | 6 +- .../asyncevents/EventBuilderImpl.java | 13 ++- .../corepersistence/index/IndexService.java | 2 +- .../corepersistence/index/IndexServiceImpl.java | 13 ++- .../index/ApplicationEntityIndex.java | 21 ++++- .../impl/EsApplicationEntityIndexImpl.java | 10 +++ 7 files changed, 112 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 9430c4e..407ae94 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -65,6 +65,8 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException; import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException; +import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.index.query.CounterResolution; import org.apache.usergrid.persistence.index.query.Identifier; import org.apache.usergrid.persistence.map.MapManager; @@ -167,6 +169,8 @@ public class CpEntityManager implements EntityManager { private PipelineBuilderFactory pipelineBuilderFactory; + private final GraphManagerFactory graphManagerFactory; + private boolean skipAggregateCounters; private MetricsFactory metricsFactory; private Timer aggCounterTimer; @@ -207,7 +211,8 @@ public class CpEntityManager implements EntityManager { */ public CpEntityManager( final CassandraService cass, final CounterUtils counterUtils, final AsyncEventService indexService, final ManagerCache managerCache, final MetricsFactory metricsFactory, final EntityManagerFig entityManagerFig, - final PipelineBuilderFactory pipelineBuilderFactory , final UUID applicationId ) { + final PipelineBuilderFactory pipelineBuilderFactory , + final GraphManagerFactory graphManagerFactory,final UUID applicationId ) { this.entityManagerFig = entityManagerFig; @@ -217,7 +222,9 @@ public class CpEntityManager implements EntityManager { Preconditions.checkNotNull( applicationId, "applicationId must not be null" ); Preconditions.checkNotNull( indexService, "indexService must not be null" ); Preconditions.checkNotNull( pipelineBuilderFactory, "pipelineBuilderFactory must not be null" ); + Preconditions.checkNotNull( graphManagerFactory, "graphManagerFactory must not be null" ); this.pipelineBuilderFactory = pipelineBuilderFactory; + this.graphManagerFactory = graphManagerFactory; this.managerCache = managerCache; @@ -407,10 +414,10 @@ public class CpEntityManager implements EntityManager { if(entity == null) { return null; } - Class clazz = Schema.getDefaultSchema().getEntityClass(entity.getId().getType()); + Class clazz = Schema.getDefaultSchema().getEntityClass( entity.getId().getType() ); - Entity oldFormatEntity = EntityFactory.newEntity(entity.getId().getUuid(), entity.getId().getType(), clazz); - oldFormatEntity.setProperties(CpEntityMapUtils.toMap(entity)); + Entity oldFormatEntity = EntityFactory.newEntity( entity.getId().getUuid(), entity.getId().getType(), clazz ); + oldFormatEntity.setProperties( CpEntityMapUtils.toMap( entity ) ); return oldFormatEntity; } @@ -615,47 +622,72 @@ public class CpEntityManager implements EntityManager { } + /** + * There are a series of steps that are kicked off by a delete + * 1. Mark the entity in the entity collection manager as deleted + * 2. Mark entity as deleted in the graph + * 3. Kick off async process + * 4. Delete all entity documents out of elasticsearch. + * 5. Compact Graph so that it deletes the marked values. + * 6. Delete entity from cassandra using the map manager. + * + * @param entityRef an entity reference + * + * @throws Exception + */ @Override public void delete( EntityRef entityRef ) throws Exception { - deleteAsync( entityRef ).toBlocking().lastOrDefault( null ); - //delete from our UUID index - MapManager mm = getMapManagerForTypes(); - mm.delete( entityRef.getUuid().toString() ); + //TODO: since we want the user to mark it and we sweep it later. It should be marked by the graph manager here. + //Step 1 & 2 Currently to block so we ensure that marking is done immediately + //If this returns null then nothing was marked null so the entity doesn't exist + markEntity( entityRef ).toBlocking().lastOrDefault( null ); - } + //TODO: figure out how to return async call to service tier? Do I not need to? + //Step 3 + deleteAsync( entityRef ); + } - private Observable deleteAsync( EntityRef entityRef ) throws Exception { + /** + * Marks entity for deletion in entity collection manager and graph. + * Convert this method to return a list of observables that we can crunch through on return. + * Returns merged obversable that will mark the edges in the ecm and the graph manager. + * @param entityRef + * @return + */ + private Observable markEntity(EntityRef entityRef){ if(applicationScope == null || entityRef == null){ return Observable.empty(); } + GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); EntityCollectionManager ecm = managerCache.getEntityCollectionManager( applicationScope ); Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); - // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) { - // throw new IllegalArgumentException( - // "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based"); - // } + //Step 1 & 2 of delete + return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, entityRef.getUuid().timestamp() ) ); - org.apache.usergrid.persistence.model.entity.Entity entity = - load(entityId); + } - if ( entity != null ) { + /** + * 4. Delete all entity documents out of elasticsearch. + * 5. Compact Graph so that it deletes the marked values. + * 6. Delete entity from cassandra using the map manager. + **/ + private void deleteAsync( EntityRef entityRef ) throws Exception { - decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) ); - // and finally... + Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() ); - //delete it asynchronously - indexService.queueEntityDelete( applicationScope, entityId ); + //Step 4 && 5 + indexService.queueEntityDelete( applicationScope, entityId ); + + //Step 6 + //delete from our UUID index + MapManager mm = getMapManagerForTypes(); + mm.delete( entityRef.getUuid().toString() ); - return ecm.mark( entityId ); - } - else { - return Observable.empty(); - } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index e796545..918d3d4 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -62,6 +62,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.GraphManagerFactory; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; @@ -126,6 +127,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private final MetricsFactory metricsFactory; private final AsyncEventService indexService; private final PipelineBuilderFactory pipelineBuilderFactory; + private final GraphManagerFactory graphManagerFactory; public CpEntityManagerFactory( final CassandraService cassandraService, final CounterUtils counterUtils, final Injector injector ) { @@ -140,6 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application this.metricsFactory = injector.getInstance( MetricsFactory.class ); this.indexService = injector.getInstance( AsyncEventService.class ); this.pipelineBuilderFactory = injector.getInstance( PipelineBuilderFactory.class ); + this.graphManagerFactory = injector.getInstance( GraphManagerFactory.class ); this.applicationIdCache = injector.getInstance(ApplicationIdCacheFactory.class).getInstance( getManagementEntityManager() ); @@ -198,7 +201,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application private EntityManager _getEntityManager( UUID applicationId ) { - EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, metricsFactory, entityManagerFig, pipelineBuilderFactory, applicationId ); + EntityManager em = new CpEntityManager(cassandraService, counterUtils, indexService, managerCache, + metricsFactory, entityManagerFig, pipelineBuilderFactory, graphManagerFactory, applicationId ); return em; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index d678a18..4e83e0b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -115,6 +115,9 @@ public class EventBuilderImpl implements EventBuilder { } + //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like + //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? + @Override public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId ); @@ -125,19 +128,25 @@ public class EventBuilderImpl implements EventBuilder { //needs get versions here. + //TODO: change this to be an observable //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete. //TODO: evauluate this to possibly be an observable to pass to the nextmethod. MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking() - .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED ); + .firstOrDefault( null, + mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED ); + //If there is nothing marked then we shouldn't return any results. + //TODO: evaluate if we want to return null or return empty observable when we don't have any results marked as deleted. + if(mostRecentlyMarked == null) + return null; //observable of index operation messages //this method will need the most recent version. //When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so //we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers. final Observable<IndexOperationMessage> edgeObservable = - indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() ); + indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() ); //observable of entries as the batches are deleted http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java index 47601fb..54eb464 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -81,7 +81,7 @@ public interface IndexService { * @return */ Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId, - final UUID version); + final UUID markedVersion); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 8700c9b..7d7d0d9 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -218,7 +218,7 @@ public class IndexServiceImpl implements IndexService { @Override public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, - final Id entityId, final UUID version ) { + final Id entityId, final UUID markedVersion ) { //bootstrap the lower modules from their caches final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); @@ -238,7 +238,10 @@ public class IndexServiceImpl implements IndexService { final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes); //do our observable for batching //try to send a whole batch if we can - version. + + + + //loop through candidateResults and deindex every single result that comeback. //do our observable for batching //try to send a whole batch if we can @@ -249,7 +252,11 @@ public class IndexServiceImpl implements IndexService { //collect results into a single batch .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> { //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity ); - batch.deindex( indexEdge, entityId, version ); + //TODO: refactor into stages of observable, also need a loop to get entities until we recieve nothing back. + CandidateResults crs = ei.getAllEntityVersionBeforeMark( entityId, markedVersion, 1000, 0 ); + for(CandidateResult cr: crs){ + batch.deindex( indexEdge, cr); + } } ) //return the future from the batch execution .flatMap( batch -> batch.execute() ) ); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java index 20c3f12..0e0e033 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java @@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.index; +import java.util.UUID; + import org.apache.usergrid.persistence.model.entity.Id; import rx.Observable; @@ -49,15 +51,26 @@ public interface ApplicationEntityIndex { /** - * Same as search, just iterates all documents that match the index edge exactly - * @param edge - * @param limit - * @param offset + * Same as search, just iterates all documents that match the index edge exactly. + * @param edge The edge to search on + * @param entityId The entity that the searchEdge is connected to. + * @param limit The limit of the values to return per search. + * @param offset The offset to page the query on. * @return */ CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId, final int limit, final int offset); /** + * Returns all entity documents that match the entityId and come before the marked version + * @param entityId The entityId to match when searching + * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted. + * @param limit The limit of the values to return per search. + * @param offset The offset to page the query on. + * @return + */ + CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final UUID markedVersion ,final int limit, final int offset); + + /** * delete all application records * @return */ http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ed317b29/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index 2256769..2709569 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ListenableActionFuture; @@ -40,6 +41,8 @@ import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang.NotImplementedException; + import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.util.ValidationUtils; @@ -221,6 +224,13 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { } + @Override + public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit, + final int offset ) { + throw new NotImplementedException( "Implement me or else I won't work." ); + } + + /** * Completely delete an index. */
