Repository: incubator-usergrid Updated Branches: refs/heads/USERGRID-608 ed317b295 -> 630cb4a87
Removed inner observable and converted it into regular code that functions. Will convert back in next push. Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/11648ab1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/11648ab1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/11648ab1 Branch: refs/heads/USERGRID-608 Commit: 11648ab1c84be349a45020e0cc4a9cafd60d005b Parents: ed317b2 Author: GERey <[email protected]> Authored: Mon May 18 15:08:15 2015 -0700 Committer: GERey <[email protected]> Committed: Mon May 18 15:08:15 2015 -0700 ---------------------------------------------------------------------- .../mvcc/stage/delete/UniqueCleanup.java | 106 ++++++++++++------- 1 file changed, 69 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/11648ab1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java index 0034f03..3e3e531 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java @@ -88,44 +88,76 @@ public class UniqueCleanup final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection(); final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion(); + Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields( + applicationScope, entityId ); + + final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); + + + while(uniqueFields.hasNext()){ + UniqueValue uniqueValue = uniqueFields.next(); + final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); + //TODO: should this be equals? That way we clean up the one marked as well + if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion ) >= 0){ + logger + .debug( "Deleting value:{} from application scope: {} ", uniqueValue, applicationScope ); + uniqueCleanupBatch.mergeShallow( + uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue )); + + } + } + + try { + uniqueCleanupBatch.execute(); + } + catch ( ConnectionException e ) { + throw new RuntimeException( "Unable to execute batch mutation", e ); + } + + + //TODO Refactor this logic into a a class that can be invoked from anywhere - //iterate all unique values - final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = - Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { - @Override - protected Iterator<UniqueValue> getIterator() { - return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); - } - } ) - - //skip versions > the specified version - .skipWhile( uniqueValue -> { - - final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); - - return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0; - } ) - - //buffer our buffer size, then roll them all up in a single batch mutation - .buffer( serializationFig.getBufferSize() ) - - //roll them up - .doOnNext( uniqueValues -> { - final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); - - - for ( UniqueValue value : uniqueValues ) { - uniqueCleanupBatch - .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); - } - - try { - uniqueCleanupBatch.execute(); - } - catch ( ConnectionException e ) { - throw new RuntimeException( "Unable to execute batch mutation", e ); - } - } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); +// //iterate all unique values +// final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup = +// Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) { +// @Override +// protected Iterator<UniqueValue> getIterator() { +// return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId ); +// } +// } ) +// +// //skip versions > the specified version +// //TODO: does this emit for every version before the staticComparator? +// .skipWhile( uniqueValue -> { +// +// logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion ); +// final UUID uniqueValueVersion = uniqueValue.getEntityVersion(); +// //TODO: should this be equals? That way we clean up the one marked as well +// return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion ) > 0; +// } ) +// +// //buffer our buffer size, then roll them all up in a single batch mutation +// .buffer( serializationFig.getBufferSize() ) +// +// //roll them up +// .doOnNext( uniqueValues -> { +// final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch(); +// +// +// for ( UniqueValue value : uniqueValues ) { +// logger +// .debug( "Deleting value:{} from application scope: {} ", value, applicationScope ); +// uniqueCleanupBatch +// .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) ); +// } +// +// try { +// uniqueCleanupBatch.execute(); +// } +// catch ( ConnectionException e ) { +// throw new RuntimeException( "Unable to execute batch mutation", e ); +// } +// } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent ); } ); return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
