Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-608 630cb4a87 -> 00aa293d0
[USERGRID-608] Fixed signature to make method self documenting Added fix for UniqueCleanup.java where it was comparing things to the wrong standard. Added fixes for IndexServiceImpl so that it can properly follow the delete async process it is called from. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/00aa293d Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/00aa293d Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/00aa293d Branch: refs/heads/USERGRID-608 Commit: 00aa293d0c161a056c8f8e10d3f5880eeac2df5b Parents: 630cb4a Author: GERey <[email protected]> Authored: Fri May 22 12:49:01 2015 -0700 Committer: GERey <[email protected]> Committed: Fri May 22 12:49:01 2015 -0700 ---------------------------------------------------------------------- .../corepersistence/index/IndexServiceImpl.java | 82 +++++-------- .../usergrid/persistence/EntityManagerIT.java | 67 +++++------ .../mvcc/stage/delete/UniqueCleanup.java | 120 ++++++++----------- .../index/ApplicationEntityIndex.java | 2 +- .../impl/EsApplicationEntityIndexImpl.java | 2 +- .../persistence/index/impl/EntityIndexTest.java | 8 +- 6 files changed, 121 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 7d7d0d9..5e2a5ea 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.ObservableTimer; import org.apache.usergrid.persistence.core.scope.ApplicationScope; @@ -33,6 +34,7 @@ import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.serialization.EdgesObservable; import org.apache.usergrid.persistence.index.ApplicationEntityIndex; import org.apache.usergrid.persistence.index.CandidateResult; @@ -41,6 +43,7 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch; import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.IndexEdge; import org.apache.usergrid.persistence.index.IndexFig; +import org.apache.usergrid.persistence.index.SearchEdge; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -52,9 +55,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import rx.Observable; -import rx.functions.Func1; -import rx.observables.ConnectableObservable; +import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource; import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget; import static org.apache.usergrid.persistence.Schema.getDefaultSchema; @@ -188,14 +190,15 @@ public class IndexServiceImpl implements IndexService { CandidateResults targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, 0 ); - //Should loop thorugh and query for all documents and if there are no documents then the loop should exit. - do{ + //Should loop thorugh and query for all documents and if there are no documents then the loop should + // exit. + do { batch = deindexBatchIteratorResolver( fromSource, targetEdgesToBeDeindexed, batch ); - if(!targetEdgesToBeDeindexed.getOffset().isPresent()) - break; - targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, targetEdgesToBeDeindexed.getOffset().get() ); - }while(!targetEdgesToBeDeindexed.isEmpty()); - + if ( !targetEdgesToBeDeindexed.getOffset().isPresent() ) break; + targetEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromSource, targetId, 1000, + targetEdgesToBeDeindexed.getOffset().get() ); + } + while ( !targetEdgesToBeDeindexed.isEmpty() ); final IndexEdge fromTarget = generateScopeFromTarget( edge ); @@ -203,12 +206,13 @@ public class IndexServiceImpl implements IndexService { CandidateResults sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, 0 ); - do{ + do { batch = deindexBatchIteratorResolver( fromTarget, sourceEdgesToBeDeindexed, batch ); - if(!sourceEdgesToBeDeindexed.getOffset().isPresent()) - break; - sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, sourceEdgesToBeDeindexed.getOffset().get() ); - }while(!sourceEdgesToBeDeindexed.isEmpty()); + if ( !sourceEdgesToBeDeindexed.getOffset().isPresent() ) break; + sourceEdgesToBeDeindexed = ei.getAllEdgeDocuments( fromTarget, sourceId, 1000, + sourceEdgesToBeDeindexed.getOffset().get() ); + } + while ( !sourceEdgesToBeDeindexed.isEmpty() ); return batch.execute(); } ); @@ -216,52 +220,33 @@ public class IndexServiceImpl implements IndexService { return ObservableTimer.time( batches, addTimer ); } + //This should look up the entityId and delete any documents with a timestamp that comes before + //The edges that are connected will be compacted away from the graph. @Override public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope, final Id entityId, final UUID markedVersion ) { //bootstrap the lower modules from their caches - final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope ); final ApplicationEntityIndex ei = entityIndexFactory.createApplicationEntityIndex( applicationScope ); - //we always index in the target scope - final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId ); - - //we may have to index we're indexing from source->target here - final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) ); - - - //we might or might not need to index from target-> source - final Observable<IndexEdge> targetSizes = getIndexEdgesAsTarget( gm, entityId ); - - //merge the edges together - final Observable<IndexEdge> observable = Observable.merge( sourceEdgesToIndex, targetSizes); - //do our observable for batching - //try to send a whole batch if we can - + CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion ); + //not actually sure about the timestamp but ah well. works. + SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(), + CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId, + entityId.getUuid().timestamp() ) ); - //loop through candidateResults and deindex every single result that comeback. - //do our observable for batching - //try to send a whole batch if we can - final Observable<IndexOperationMessage> batches = observable.buffer( indexFig.getIndexBatchSize() ) - - //map into batches based on our buffer size - .flatMap( buffer -> Observable.from( buffer ) + final Observable<IndexOperationMessage> batches = Observable.from( crs ) //collect results into a single batch - .collect( () -> ei.createBatch(), ( batch, indexEdge ) -> { - //logger.debug( "adding edge {} to batch for entity {}", indexEdge, entity ); - //TODO: refactor into stages of observable, also need a loop to get entities until we recieve nothing back. - CandidateResults crs = ei.getAllEntityVersionBeforeMark( entityId, markedVersion, 1000, 0 ); - for(CandidateResult cr: crs){ - batch.deindex( indexEdge, cr); - } + .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> { + logger.debug( "Deindexing on edge {} for entity {} added to batch",searchEdge , entityId ); + batch.deindex( searchEdge, candidateResult ); } ) //return the future from the batch execution - .flatMap( batch -> batch.execute() ) ); + .flatMap( batch -> batch.execute() ); - return ObservableTimer.time( batches, indexTimer ); + return ObservableTimer.time(batches, indexTimer); } @@ -274,7 +259,7 @@ public class IndexServiceImpl implements IndexService { */ private Observable<IndexEdge> getIndexEdgesAsTarget( final GraphManager graphManager, final Id entityId ) { - final String collectionName = InflectionUtils.pluralize( entityId.getType() ); + final String collectionName = InflectionUtils.pluralize( entityId.getType() ); final CollectionInfo collection = getDefaultSchema().getCollection( Application.ENTITY_TYPE, collectionName ); @@ -311,8 +296,7 @@ public class IndexServiceImpl implements IndexService { public EntityIndexBatch deindexBatchIteratorResolver(IndexEdge edge,CandidateResults edgesToBeDeindexed, EntityIndexBatch batch){ Iterator itr = edgesToBeDeindexed.iterator(); while( itr.hasNext() ) { - CandidateResult cr = ( CandidateResult ) itr.next(); - batch.deindex( edge, cr.getId(), cr.getVersion() ); + batch.deindex( edge, ( CandidateResult ) itr.next()); } return batch; } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java index faf22e5..c70a141 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/EntityManagerIT.java @@ -46,7 +46,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - public class EntityManagerIT extends AbstractCoreIT { private static final Logger LOG = LoggerFactory.getLogger( EntityManagerIT.class ); @@ -74,13 +73,12 @@ public class EntityManagerIT extends AbstractCoreIT { user = em.get( user ); assertNotNull( user ); - assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username")); + assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) ); assertEquals( "user.email not expected value", "[email protected]", user.getProperty( "email" ) ); app.refreshIndex(); - EntityRef userRef = em.getAlias( - new SimpleEntityRef("application", applicationId), "users", "edanuff" ); + EntityRef userRef = em.getAlias( new SimpleEntityRef( "application", applicationId ), "users", "edanuff" ); assertNotNull( userRef ); assertEquals( "userRef.id not expected value", user.getUuid(), userRef.getUuid() ); @@ -89,7 +87,7 @@ public class EntityManagerIT extends AbstractCoreIT { LOG.info( "user.username: " + user.getProperty( "username" ) ); LOG.info( "user.email: " + user.getProperty( "email" ) ); - final Query query = Query.fromQL("username = 'edanuff'"); + final Query query = Query.fromQL( "username = 'edanuff'" ); Results results = em.searchCollection( em.getApplicationRef(), "users", query ); assertNotNull( results ); @@ -109,8 +107,8 @@ public class EntityManagerIT extends AbstractCoreIT { assertEquals( 1, results.size() ); user = results.getEntity(); assertNotNull( user ); - assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username")); - assertEquals( "user.email not expected value", "[email protected]", user.getProperty( "email")); + assertEquals( "user.username not expected value", "edanuff", user.getProperty( "username" ) ); + assertEquals( "user.email not expected value", "[email protected]", user.getProperty( "email" ) ); LOG.info( "user.username: " + user.getProperty( "username" ) ); LOG.info( "user.email: " + user.getProperty( "email" ) ); @@ -141,7 +139,7 @@ public class EntityManagerIT extends AbstractCoreIT { i = 0; for ( Entity entity : things ) { - Entity thing = em.get( new SimpleEntityRef( "thing", entity.getUuid())); + Entity thing = em.get( new SimpleEntityRef( "thing", entity.getUuid() ) ); assertNotNull( "thing should not be null", thing ); assertFalse( "thing id not valid", thing.getUuid().equals( new UUID( 0, 0 ) ) ); assertEquals( "name not expected value", "thing" + i, thing.getProperty( "name" ) ); @@ -153,7 +151,7 @@ public class EntityManagerIT extends AbstractCoreIT { for ( Entity entity : things ) { ids.add( entity.getUuid() ); - Entity en = em.get( new SimpleEntityRef( "thing", entity.getUuid())); + Entity en = em.get( new SimpleEntityRef( "thing", entity.getUuid() ) ); String type = en.getType(); assertEquals( "type not expected value", "thing", type ); @@ -236,7 +234,7 @@ public class EntityManagerIT extends AbstractCoreIT { properties.put( "name", "testprop" ); Entity thing = em.create( "thing", properties ); - Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid())); + Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid() ) ); assertNotNull( "entity should not be null", entity ); em.setProperty( entity, "alpha", 1L ); em.setProperty( entity, "beta", 2L ); @@ -246,9 +244,9 @@ public class EntityManagerIT extends AbstractCoreIT { assertNotNull( "properties should not be null", props ); assertEquals( "wrong number of properties", 8, props.size() ); - assertEquals( "wrong value for property alpha", (long) 1, props.get( "alpha" ) ); - assertEquals( "wrong value for property beta", (long) 2, props.get( "beta" ) ); - assertEquals( "wrong value for property gamma", (long) 3, props.get( "gamma" ) ); + assertEquals( "wrong value for property alpha", ( long ) 1, props.get( "alpha" ) ); + assertEquals( "wrong value for property beta", ( long ) 2, props.get( "beta" ) ); + assertEquals( "wrong value for property gamma", ( long ) 3, props.get( "gamma" ) ); for ( Entry<String, Object> entry : props.entrySet() ) { LOG.info( entry.getKey() + " : " + entry.getValue() ); @@ -277,6 +275,8 @@ public class EntityManagerIT extends AbstractCoreIT { Entity thing = em.create( "thing", properties ); LOG.info( "Entity created" ); + app.refreshIndex(); + LOG.info( "Starting entity delete" ); em.delete( thing ); LOG.info( "Entity deleted" ); @@ -286,11 +286,10 @@ public class EntityManagerIT extends AbstractCoreIT { // now search by username, no results should be returned - final Query emailQuery = Query.fromQL( "name = '" + name +"'" ); + final Query emailQuery = Query.fromQL( "name = '" + name + "'" ); - Results r = em.searchCollection( em.getApplicationRef(), "thing", - emailQuery ); + Results r = em.searchCollection( em.getApplicationRef(), "thing", emailQuery ); assertEquals( 0, r.size() ); } @@ -323,8 +322,7 @@ public class EntityManagerIT extends AbstractCoreIT { // now search by username, no results should be returned final Query query = Query.fromQL( "username = '" + name + "'" ); - Results r = em.searchCollection( em.getApplicationRef(), "users", - query ); + Results r = em.searchCollection( em.getApplicationRef(), "users", query ); assertEquals( 0, r.size() ); @@ -342,7 +340,7 @@ public class EntityManagerIT extends AbstractCoreIT { final Query userNameQuery = Query.fromQL( "username = '" + name + "'" ); - r = em.searchCollection( em.getApplicationRef(), "users", userNameQuery); + r = em.searchCollection( em.getApplicationRef(), "users", userNameQuery ); assertEquals( 1, r.size() ); @@ -350,7 +348,7 @@ public class EntityManagerIT extends AbstractCoreIT { } - @SuppressWarnings("unchecked") + @SuppressWarnings( "unchecked" ) @Test public void testJson() throws Exception { LOG.info( "EntityDaoTest.testProperties" ); @@ -361,7 +359,7 @@ public class EntityManagerIT extends AbstractCoreIT { properties.put( "name", "testprop" ); Entity thing = em.create( "thing", properties ); - Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid())); + Entity entity = em.get( new SimpleEntityRef( "thing", thing.getUuid() ) ); assertNotNull( "entity should not be null", entity ); Map<String, Object> json = new LinkedHashMap<String, Object>(); @@ -384,7 +382,7 @@ public class EntityManagerIT extends AbstractCoreIT { @Test - @Ignore("There is a concurrency issue due to counters not being thread safe: see USERGRID-1753") + @Ignore( "There is a concurrency issue due to counters not being thread safe: see USERGRID-1753" ) public void testEntityCounters() throws Exception { LOG.info( "EntityManagerIT#testEntityCounters" ); EntityManager em = app.getEntityManager(); @@ -394,17 +392,16 @@ public class EntityManagerIT extends AbstractCoreIT { organizationEntity.setProperty( "name", "testCounterOrg" ); organizationEntity = em.create( organizationEntity ); - Entity appInfo = setup.getEmf().createApplicationV2( - "testCounterOrg", "testEntityCounters" + UUIDGenerator.newTimeUUID() ); - UUID applicationId = UUIDUtils.tryExtractUUID( - appInfo.getProperty(PROPERTY_APPLICATION_ID).toString()); + Entity appInfo = + setup.getEmf().createApplicationV2( "testCounterOrg", "testEntityCounters" + UUIDGenerator.newTimeUUID() ); + UUID applicationId = UUIDUtils.tryExtractUUID( appInfo.getProperty( PROPERTY_APPLICATION_ID ).toString() ); Map<String, Object> properties = new LinkedHashMap<String, Object>(); properties.put( "name", "testEntityCounters" ); Entity applicationEntity = em.create( applicationId, CpNamingUtils.APPLICATION_INFO, properties ); em.createConnection( new SimpleEntityRef( "group", organizationEntity.getUuid() ), "owns", - new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) ); + new SimpleEntityRef( CpNamingUtils.APPLICATION_INFO, applicationId ) ); em = setup.getEmf().getEntityManager( applicationId ); properties = new LinkedHashMap<String, Object>(); @@ -457,7 +454,7 @@ public class EntityManagerIT extends AbstractCoreIT { // now search by username, no results should be returned - EntityRef appRef = em.get( new SimpleEntityRef("application", app.getId() ) ); + EntityRef appRef = em.get( new SimpleEntityRef( "application", app.getId() ) ); app.refreshIndex(); @@ -481,7 +478,7 @@ public class EntityManagerIT extends AbstractCoreIT { properties.put( "email", "[email protected]" ); Entity created = em.create( "user", properties ); - Entity returned = em.get( new SimpleEntityRef( "user", created.getUuid())); + Entity returned = em.get( new SimpleEntityRef( "user", created.getUuid() ) ); assertNotNull( created ); assertNotNull( returned ); @@ -503,13 +500,13 @@ public class EntityManagerIT extends AbstractCoreIT { properties.put( "name", "one" ); Entity saved = em.create( "thing", properties ); - Entity thingOne = em.get( new SimpleEntityRef("thing", saved.getUuid())); + Entity thingOne = em.get( new SimpleEntityRef( "thing", saved.getUuid() ) ); assertNotNull( "entity should not be null", thingOne ); assertEquals( "one", thingOne.getProperty( "name" ).toString() ); em.setProperty( thingOne, "name", "two", true ); - Entity thingTwo = em.get( new SimpleEntityRef("thing",saved.getUuid())); + Entity thingTwo = em.get( new SimpleEntityRef( "thing", saved.getUuid() ) ); assertEquals( "two", thingTwo.getProperty( "name" ) ); } @@ -527,7 +524,7 @@ public class EntityManagerIT extends AbstractCoreIT { userProps.put( "email", "[email protected]" ); Entity createdUser = em.create( "user", userProps ); - Entity returnedUser = em.get( new SimpleEntityRef("user",createdUser.getUuid())); + Entity returnedUser = em.get( new SimpleEntityRef( "user", createdUser.getUuid() ) ); assertNotNull( createdUser ); assertNotNull( returnedUser ); @@ -539,7 +536,7 @@ public class EntityManagerIT extends AbstractCoreIT { userProps2.put( "email", "[email protected]" ); Entity createdUser2 = em.create( "user", userProps2 ); - Entity returnedUser2 = em.get( new SimpleEntityRef("user",createdUser2.getUuid())); + Entity returnedUser2 = em.get( new SimpleEntityRef( "user", createdUser2.getUuid() ) ); assertNotNull( createdUser2 ); assertNotNull( returnedUser2 ); @@ -553,7 +550,7 @@ public class EntityManagerIT extends AbstractCoreIT { app.refreshIndex(); - Entity returnedDevice = em.get( new SimpleEntityRef("device", createdDevice.getUuid())); + Entity returnedDevice = em.get( new SimpleEntityRef( "device", createdDevice.getUuid() ) ); assertNotNull( createdDevice ); assertNotNull( returnedDevice ); @@ -585,6 +582,6 @@ public class EntityManagerIT extends AbstractCoreIT { app.refreshIndex(); - assertNotNull( em.get( user.getUuid() )); + assertNotNull( em.get( user.getUuid() ) ); } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java index 3e3e531..affd82c 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java @@ -20,8 +20,10 @@ package org.apache.usergrid.persistence.collection.mvcc.stage.delete; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.UUID; import org.slf4j.Logger; @@ -82,84 +84,62 @@ public class UniqueCleanup final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) { final Observable<CollectionIoEvent<MvccEntity>> outputObservable = - collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> { + collectionIoEventObservable.flatMap( mvccEntityCollectionIoEvent -> { final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId(); final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection(); final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion(); - Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields( - applicationScope, entityId ); - - final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); - - - while(uniqueFields.hasNext()){ - UniqueValue uniqueValue = uniqueFields.next(); - final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); - //TODO: should this be equals? That way we clean up the one marked as well - if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion ) >= 0){ - logger - .debug( "Deleting value:{} from application scope: {} ", uniqueValue, applicationScope ); - uniqueCleanupBatch.mergeShallow( - uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue )); - - } - } - - try { - uniqueCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - - //TODO Refactor this logic into a a class that can be invoked from anywhere -// //iterate all unique values -// final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = -// Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { -// @Override -// protected Iterator<UniqueValue> getIterator() { -// return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); -// } -// } ) -// -// //skip versions > the specified version -// //TODO: does this emit for every version before the staticComparator? -// .skipWhile( uniqueValue -> { -// -// logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion ); -// final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); -// //TODO: should this be equals? That way we clean up the one marked as well -// return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion ) > 0; -// } ) -// -// //buffer our buffer size, then roll them all up in a single batch mutation -// .buffer( serializationFig.getBufferSize() ) -// -// //roll them up -// .doOnNext( uniqueValues -> { -// final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); -// -// -// for ( UniqueValue value : uniqueValues ) { -// logger -// .debug( "Deleting value:{} from application scope: {} ", value, applicationScope ); -// uniqueCleanupBatch -// .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); -// } -// -// try { -// uniqueCleanupBatch.execute(); -// } -// catch ( ConnectionException e ) { -// throw new RuntimeException( "Unable to execute batch mutation", e ); -// } -// } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); + //iterate all unique values + final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = + Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { + @Override + protected Iterator<UniqueValue> getIterator() { + return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); + } + } ) + + //skip versions > the specified version + //TODO: does this emit for every version before the staticComparator? + .skipWhile( uniqueValue -> { + + logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion ); + final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); + //TODO: should this be equals? That way we clean up the one marked as well + return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0; + } ) + + //buffer our buffer size, then roll them all up in a single batch mutation + .buffer( serializationFig.getBufferSize() ) + + //roll them up + + .doOnNext( uniqueValues -> { + final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); + + + for ( UniqueValue value : uniqueValues ) { + logger + .debug( "Deleting value:{} from application scope: {} ", value, applicationScope ); + uniqueCleanupBatch + .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); + } + + try { + uniqueCleanupBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute batch mutation", e ); + } + } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); + + return ObservableTimer.time( uniqueValueCleanup, uniqueCleanupTimer ); } ); - return ObservableTimer.time( outputObservable, uniqueCleanupTimer ); + + + return outputObservable; } } http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java index 0e0e033..9ce65e9 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java @@ -68,7 +68,7 @@ public interface ApplicationEntityIndex { * @param offset The offset to page the query on. * @return */ - CandidateResults getAllEntityVersionBeforeMark(final Id entityId, final UUID markedVersion ,final int limit, final int offset); + CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ); /** * delete all application records http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java index ad47348..99e5525 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java @@ -228,7 +228,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex { @Override - public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion) { + public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) { Preconditions.checkNotNull( entityId, "entityId cannot be null" ); //TODO: check to see if there is some version verifcation. I know there is but i forget where. http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00aa293d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java index 79fc14a..00ddc17 100644 --- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java +++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import org.apache.commons.lang3.time.StopWatch; -import org.apache.usergrid.persistence.core.guice.MigrationManagerRule; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; import org.apache.usergrid.persistence.core.test.UseModules; @@ -385,13 +384,13 @@ public class EntityIndexTest extends BaseIT { Entity entity = EntityIndexMapUtils.fromMap( entityMap ); EntityUtils.setId( entity, new SimpleId( "fastcar" ) ); EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() ); - entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID())); + entity.setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, UUID.randomUUID() ) ); entityIndex.createBatch().index( searchEdge, entity ).execute().toBlocking().last(); ei.refreshAsync().toBlocking().first(); CandidateResults candidateResults = entityIndex - .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 ); + .search( searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 ); assertEquals( 1, candidateResults.size() ); EntityIndexBatch batch = entityIndex.createBatch(); @@ -444,7 +443,8 @@ public class EntityIndexTest extends BaseIT { ei.refreshAsync().toBlocking().first(); CandidateResults candidateResults = entityIndex - .getAllEntityVersionBeforeMark( entity[versionToSearchFor].getId(), entity[versionToSearchFor].getVersion()); + .getAllEntityVersionsBeforeMarkedVersion( entity[versionToSearchFor].getId(), + entity[versionToSearchFor].getVersion() ); assertEquals( 501, candidateResults.size() ); }
