Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 ed317b295 -> 630cb4a87


Removed inner observable and converted it into regular code that functions. 
Will convert back in next push.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/11648ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/11648ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/11648ab1

Branch: refs/heads/USERGRID-608
Commit: 11648ab1c84be349a45020e0cc4a9cafd60d005b
Parents: ed317b2
Author: GERey <[email protected]>
Authored: Mon May 18 15:08:15 2015 -0700
Committer: GERey <[email protected]>
Committed: Mon May 18 15:08:15 2015 -0700

----------------------------------------------------------------------
 .../mvcc/stage/delete/UniqueCleanup.java        | 106 ++++++++++++-------
 1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/11648ab1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 0034f03..3e3e531 100644
--- 
a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ 
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -88,44 +88,76 @@ public class UniqueCleanup
                 final ApplicationScope applicationScope = 
mvccEntityCollectionIoEvent.getEntityCollection();
                 final UUID entityVersion = 
mvccEntityCollectionIoEvent.getEvent().getVersion();
 
+                Iterator<UniqueValue> uniqueFields = 
uniqueValueSerializationStrategy.getAllUniqueFields(
+                    applicationScope, entityId );
+
+                final MutationBatch uniqueCleanupBatch = 
keyspace.prepareMutationBatch();
+
+
+                while(uniqueFields.hasNext()){
+                    UniqueValue uniqueValue = uniqueFields.next();
+                    final UUID uniqueValueVersion = 
uniqueValue.getEntityVersion();
+                    //TODO: should this be equals? That way we clean up the 
one marked as well
+                    if(UUIDComparator.staticCompare( entityVersion, 
uniqueValueVersion ) >= 0){
+                        logger
+                            .debug( "Deleting value:{} from application scope: 
{} ", uniqueValue, applicationScope );
+                        uniqueCleanupBatch.mergeShallow(
+                            uniqueValueSerializationStrategy.delete( 
applicationScope,uniqueValue ));
+
+                    }
+                }
+
+                try {
+                    uniqueCleanupBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute batch 
mutation", e );
+                }
+
+
+
                 //TODO Refactor this logic into a a class that can be invoked 
from anywhere
-                //iterate all unique values
-                final Observable<CollectionIoEvent<MvccEntity>> 
uniqueValueCleanup =
-                    Observable.create( new ObservableIterator<UniqueValue>( 
"Unique value load" ) {
-                        @Override
-                        protected Iterator<UniqueValue> getIterator() {
-                            return 
uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId 
);
-                        }
-                    } )
-
-                        //skip  versions > the specified version
-                        .skipWhile( uniqueValue -> {
-
-                            final UUID uniqueValueVersion = 
uniqueValue.getEntityVersion();
-
-                            return UUIDComparator.staticCompare( 
uniqueValueVersion, entityVersion ) > 0;
-                        } )
-
-                            //buffer our buffer size, then roll them all up in 
a single batch mutation
-                        .buffer( serializationFig.getBufferSize() )
-
-                            //roll them up
-                        .doOnNext( uniqueValues -> {
-                            final MutationBatch uniqueCleanupBatch = 
keyspace.prepareMutationBatch();
-
-
-                            for ( UniqueValue value : uniqueValues ) {
-                                uniqueCleanupBatch
-                                    .mergeShallow( 
uniqueValueSerializationStrategy.delete( applicationScope, value ) );
-                            }
-
-                            try {
-                                uniqueCleanupBatch.execute();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new RuntimeException( "Unable to execute 
batch mutation", e );
-                            }
-                        } ).lastOrDefault( Collections.emptyList() ).map( list 
-> mvccEntityCollectionIoEvent );
+//                //iterate all unique values
+//                final Observable<CollectionIoEvent<MvccEntity>> 
uniqueValueCleanup =
+//                    Observable.create( new ObservableIterator<UniqueValue>( 
"Unique value load" ) {
+//                        @Override
+//                        protected Iterator<UniqueValue> getIterator() {
+//                            return 
uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId 
);
+//                        }
+//                    } )
+//
+//                        //skip  versions > the specified version
+//                        //TODO: does this emit for every version before the 
staticComparator?
+//                        .skipWhile( uniqueValue -> {
+//
+//                            logger.debug( "Cleaning up version:{} in 
UniqueCleanup", entityVersion );
+//                            final UUID uniqueValueVersion = 
uniqueValue.getEntityVersion();
+//                            //TODO: should this be equals? That way we clean 
up the one marked as well
+//                            return UUIDComparator.staticCompare( 
entityVersion,uniqueValueVersion ) > 0;
+//                        } )
+//
+//                            //buffer our buffer size, then roll them all up 
in a single batch mutation
+//                        .buffer( serializationFig.getBufferSize() )
+//
+//                            //roll them up
+//                        .doOnNext( uniqueValues -> {
+//                            final MutationBatch uniqueCleanupBatch = 
keyspace.prepareMutationBatch();
+//
+//
+//                            for ( UniqueValue value : uniqueValues ) {
+//                                logger
+//                                    .debug( "Deleting value:{} from 
application scope: {} ", value, applicationScope );
+//                                uniqueCleanupBatch
+//                                    .mergeShallow( 
uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+//                            }
+//
+//                            try {
+//                                uniqueCleanupBatch.execute();
+//                            }
+//                            catch ( ConnectionException e ) {
+//                                throw new RuntimeException( "Unable to 
execute batch mutation", e );
+//                            }
+//                        } ).lastOrDefault( Collections.emptyList() ).map( 
list -> mvccEntityCollectionIoEvent );
             } );
 
         return ObservableTimer.time( outputObservable, uniqueCleanupTimer );

Reply via email to