Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-608 [created] 50f8a56fd
[USERGRID-608] Copied method indexEntity to deleteEntityIndexes and made it a deindex call that requires version. Added version to method signature. Created Observable to get first marked entry. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/50f8a56f Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/50f8a56f Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/50f8a56f Branch: refs/heads/USERGRID-608 Commit: 50f8a56fdfa558f9204c348bec59c938347c7485 Parents: 79aea6a Author: GERey <[email protected]> Authored: Thu May 14 13:59:30 2015 -0700 Committer: GERey <[email protected]> Committed: Thu May 14 13:59:30 2015 -0700 ---------------------------------------------------------------------- .../asyncevents/EventBuilderImpl.java | 13 +++++- .../corepersistence/index/IndexService.java | 5 ++- .../corepersistence/index/IndexServiceImpl.java | 42 +++++++++++++++++--- 3 files changed, 53 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index c0d82d2..d678a18 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -123,10 +123,21 @@ public class EventBuilderImpl implements EventBuilder { final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + //needs get versions here. + + //TODO: change this to be an observable + //so we get these versions and loop through them until we find the MvccLogEntry that is marked as delete. + //TODO: evauluate this to possibly be an observable to pass to the nextmethod. + MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking() + .first( mvccLogEntry -> mvccLogEntry.getState()== MvccLogEntry.State.DELETED ); + //observable of index operation messages + //this method will need the most recent version. + //When we go to compact the graph make sure you turn on the debugging mode for the deleted nodes so + //we can verify that we mark them. That said that part seems kinda done. as we also delete the mvcc buffers. final Observable<IndexOperationMessage> edgeObservable = - indexService.deleteEntityIndexes( applicationScope, entityId ); + indexService.deleteEntityIndexes( applicationScope, entityId,mostRecentlyMarked.getVersion() ); //observable of entries as the batches are deleted http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java index 30e4dad..47601fb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.index; +import java.util.UUID; + import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.index.IndexEdge; @@ -78,7 +80,8 @@ public interface IndexService { * @param entityId * @return */ - Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId); + Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId, + final UUID version); http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/50f8a56f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index d616090..8700c9b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -21,11 +21,11 @@ package org.apache.usergrid.corepersistence.index; import java.util.Iterator; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.exception.NotImplementedException; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -41,7 +41,6 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.IndexFig; -import org.apache.usergrid.persistence.index.SearchTypes; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -219,10 +218,43 @@ public class IndexServiceImpl implements IndexService { @Override public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, - final Id entityId ) { + final Id entityId, final UUID version ) { - //TODO query ES and remove this entityId - throw new NotImplementedException( "Implement me" ); + //bootstrap the lower modules from their caches + final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); + final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); + + //we always index in the target scope + final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId ); + + //we may have to index we're indexing from source->target here + final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) ); + + + //we might or might not need to index from target-> source + final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId ); + + //merge the edges together + final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes); + //do our observable for batching + //try to send a whole batch if we can + version. + + //do our observable for batching + //try to send a whole batch if we can + final Observable<IndexOperationMessage> batches = observable.buffer( indexFig.getIndexBatchSize() ) + + //map into batches based on our buffer size + .flatMap( buffer -> Observable.from( buffer ) + //collect results into a single batch + .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> { + //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity ); + batch.deindex( indexEdge, entityId, version ); + } ) + //return the future from the batch execution + .flatMap( batch -> batch.execute() ) ); + + return ObservableTimer.time( batches, indexTimer ); }
