Fixes incorrect Cassandra range scanning in serialization with min/max values


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6233bf0b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6233bf0b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6233bf0b

Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 6233bf0b23e6f86b0d57607d2dd423599c27e726
Parents: 13d2594
Author: Todd Nine <[email protected]>
Authored: Tue Sep 15 17:20:24 2015 -0600
Committer: Todd Nine <[email protected]>
Committed: Tue Sep 15 17:20:24 2015 -0600

----------------------------------------------------------------------
 .../service/ConnectionServiceImpl.java          | 30 +++++-----
 .../service/ConnectionServiceImplTest.java      | 19 ++++---
 .../core/astyanax/MultiRowColumnIterator.java   |  2 +-
 .../graph/impl/GraphManagerImpl.java            |  4 +-
 .../impl/shard/impl/EdgeSearcher.java           | 14 ++++-
 .../impl/ShardedEdgeSerializationImpl.java      | 59 ++++++++++++++------
 .../shard/impl/serialize/EdgeSerializer.java    | 21 ++++++-
 .../persistence/graph/GraphManagerIT.java       |  4 +-
 8 files changed, 105 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
index d60426c..1b36321 100644
--- 
a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
+++ 
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java
@@ -170,7 +170,7 @@ public class ConnectionServiceImpl implements 
ConnectionService {
 
                     final SearchByEdgeType searchByEdge =
                         new SimpleSearchByEdgeType( entityId, edgeType, 
Long.MAX_VALUE,
-                            SearchByEdgeType.Order.ASCENDING, 
Optional.absent() );
+                            SearchByEdgeType.Order.DESCENDING, 
Optional.absent() );
 
                     //load edges from the source the with type specified
                     return gm.loadEdgesFromSource( searchByEdge );
@@ -182,21 +182,25 @@ public class ConnectionServiceImpl implements 
ConnectionService {
                     logger.debug( "Found edge {}, searching for multiple 
versions of edge", edge );
 
                     final SearchByEdge searchByEdge =
-                        new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(),
-                            Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, 
Optional.absent() );
-                    return gm.loadEdgeVersions( searchByEdge );
-                } )
+                        new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(), 0,
+                            SearchByEdgeType.Order.ASCENDING, 
Optional.absent() );
+                    return gm.loadEdgeVersions( searchByEdge )
+                        //skip the first version since it's the one we want to 
retain
+                        .skip( 1 )
+                            //mark for deletion
+                        .flatMap( edgeToDelete -> {
 
-                    //skip the first version since it's the one we want to 
retain
-                    // validate there is only 1 version of it, delete anything 
> than the min
-                .skip( 1 ).flatMap( edgeToDelete -> {
+                            logger.debug( "Deleting edge {}", edgeToDelete );
 
-                    logger.debug( "Deleting edge {}", edgeToDelete );
+                            //mark the edge and ignore the cleanup result
+                            return gm.markEdge( edgeToDelete );
+                        } )
+                            //mark all versions, then on the last, delete them 
from cass
+                        .flatMap( lastMarkedEdge -> gm.deleteEdge( 
lastMarkedEdge ) );
+
+
+                    // validate there is only 1 version of it, delete anything 
> than the min
 
-                    //mark the edge and ignore the cleanup result
-                    return gm.markEdge( edgeToDelete )
-                        //delete the edge
-                        .flatMap( edge -> gm.deleteEdge( edgeToDelete ) );
                 } ).map( deletedEdge -> new ConnectionScope( applicationScope, 
deletedEdge ) );
         } );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
index 152721a..a2c18b1 100644
--- 
a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
+++ 
b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java
@@ -49,6 +49,7 @@ import rx.Observable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 
 @RunWith( ITRunner.class )
@@ -179,19 +180,20 @@ public class ConnectionServiceImplTest {
 
         final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last();
 
+
         logger.info( "Wrote edge 3 with edge {}", written3 );
 
 
+
+        assertTrue( "Expected edge timestamp to be in order", 
written1.getTimestamp() <= written2.getTimestamp() );
+        assertTrue( "Expected edge timestamp to be in order", 
written2.getTimestamp() <= written3.getTimestamp() );
+
         //now run the cleanup
 
         final List<ConnectionScope> deletedConnections =
             connectionService.deDupeConnections( Observable.just( 
applicationScope ) ).toList().toBlocking().last();
 
-//        assertEquals( "2 edges deleted", 2, deletedConnections.size() );
-
         //check our oldest was deleted first
-
-
         assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() );
 
         assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() );
@@ -221,12 +223,13 @@ public class ConnectionServiceImplTest {
      * @param asserted
      */
     private void assertEdgeData(final Edge expected, final Edge asserted){
-        assertEquals("SourceId the same", expected.getSourceNode(), 
expected.getTargetNode());
-        assertEquals("TargetId the same", expected.getTargetNode(), 
expected.getTargetNode());
+        assertEquals("SourceId the same", expected.getSourceNode(), 
asserted.getSourceNode());
+
+        assertEquals("TargetId the same", expected.getTargetNode(), 
asserted.getTargetNode());
 
-        assertEquals("Type the same", expected.getType(), expected.getType());
+        assertEquals("Type the same", expected.getType(), asserted.getType());
 
-        assertEquals("Timestamp the same", expected.getTimestamp(), 
expected.getTimestamp());
+        assertEquals("Timestamp the same", expected.getTimestamp(), 
asserted.getTimestamp());
 
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 667992c..16249ae 100644
--- 
a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ 
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -201,7 +201,7 @@ public class MultiRowColumnIterator<R, C, T> implements 
Iterator<T> {
         final List<T> mergedResults;
 
         if ( containsSingleRowOnly( result ) ) {
-            mergedResults = singleRowResult( result );
+               mergedResults = singleRowResult( result );
         }
         else {
             mergedResults = mergeResults( result, selectSize );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 8c64fa1..e119c59 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -217,7 +217,7 @@ public class GraphManagerImpl implements GraphManager {
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgeVersions( scope,
                         new SimpleSearchByEdge( edge.getSourceNode(), 
edge.getType(), edge.getTargetNode(),
-                            Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, 
Optional.absent() ) );
+                            Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, 
Optional.absent() ) );
                 }
             } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked 
->
                 //fire our delete listener and wait for the results
@@ -458,7 +458,7 @@ public class GraphManagerImpl implements GraphManager {
                     final long edgeTimestamp = edge.getTimestamp();
 
                     //our edge needs to not be deleted and have a version 
that's > max Version
-                    if ( edge.isDeleted() || Long.compare( edgeTimestamp, 
maxVersion ) > 0 ) {
+                    if ( edge.isDeleted() ) {
                         return false;
                     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 8bcefbb..4d02ba9 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -1,6 +1,7 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -13,6 +14,7 @@ import 
org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 import com.google.common.base.Optional;
@@ -107,12 +109,12 @@ public abstract class EdgeSearcher<R, C, T> implements 
ColumnParser<C, T>, Colum
         if ( last.isPresent() ) {
             C sourceEdge = createColumn( last.get() );
 
-
             rangeBuilder.setStart( sourceEdge, getSerializer() );
+        }else {
+            setTimeScan( rangeBuilder );
         }
 
-
-        setRangeOptions(rangeBuilder);
+        setRangeOptions( rangeBuilder );
 
 
     }
@@ -122,6 +124,7 @@ public abstract class EdgeSearcher<R, C, T> implements 
ColumnParser<C, T>, Colum
         final boolean reversed = order == SearchByEdgeType.Order.ASCENDING;
 
         rangeBuilder.setReversed( reversed );
+
     }
 
 
@@ -153,6 +156,11 @@ public abstract class EdgeSearcher<R, C, T> implements 
ColumnParser<C, T>, Colum
      */
     protected abstract C createColumn( final T last );
 
+    /**
+     * Set the time scan into the range builder
+     * @param rangeBuilder
+     */
+    protected abstract void setTimeScan(final RangeBuilder rangeBuilder);
 
     /**
      * Create an edge to return to the user based on the directed edge provided

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 13f7427..2ef50f5 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -21,8 +21,8 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.UUID;
 
@@ -54,14 +54,14 @@ import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdg
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
-        .SourceDirectedEdgeDescendingComparator;
+    .SourceDirectedEdgeDescendingComparator;
 import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
-        .TargetDirectedEdgeDescendingComparator;
+    .TargetDirectedEdgeDescendingComparator;
+import 
org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeSerializer;
 import 
org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Function;
-import com.google.common.base.Optional;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
@@ -388,19 +388,6 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
                     }
 
 
-                    @Override
-                    public void buildRange( final RangeBuilder builder ) {
-
-
-                        if ( last.isPresent() ) {
-                            super.buildRange( builder );
-                            return;
-                        }
-
-                        //start seeking at a value < our max version
-                        builder.setStart( maxTimestamp );
-                    }
-
 
                     @Override
                     protected EdgeRowKey generateRowKey( long shard ) {
@@ -415,6 +402,13 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
 
 
                     @Override
+                    protected void setTimeScan( final RangeBuilder 
rangeBuilder ) {
+                          //start seeking at a value < our max version
+                        rangeBuilder.setStart( maxTimestamp );
+                    }
+
+
+                    @Override
                     protected MarkedEdge createEdge( final Long column, final 
boolean marked ) {
                         return new SimpleMarkedEdge( sourceId, type, targetId, 
column.longValue(), marked );
                     }
@@ -472,6 +466,14 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
 
 
                     @Override
+                    protected void setTimeScan( final RangeBuilder 
rangeBuilder ) {
+                        final ByteBuffer buffer = 
EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp );
+
+                        rangeBuilder.setStart( buffer );
+                    }
+
+
+                    @Override
                     protected MarkedEdge createEdge( final DirectedEdge edge, 
final boolean marked ) {
                         return new SimpleMarkedEdge( sourceId, type, edge.id, 
edge.timestamp, marked );
                     }
@@ -526,6 +528,14 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
 
 
                     @Override
+                    protected void setTimeScan( final RangeBuilder 
rangeBuilder ) {
+                        final ByteBuffer buffer = 
EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp );
+
+                        rangeBuilder.setStart( buffer );
+                    }
+
+
+                    @Override
                     protected MarkedEdge createEdge( final DirectedEdge edge, 
final boolean marked ) {
                         return new SimpleMarkedEdge( targetId, type, edge.id, 
edge.timestamp, marked );
                     }
@@ -574,6 +584,14 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
 
 
                     @Override
+                    protected void setTimeScan( final RangeBuilder 
rangeBuilder ) {
+                        final ByteBuffer buffer = 
EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp );
+
+                        rangeBuilder.setStart( buffer );
+                    }
+
+
+                    @Override
                     protected MarkedEdge createEdge( final DirectedEdge edge, 
final boolean marked ) {
                         return new SimpleMarkedEdge( edge.id, type, targetId, 
edge.timestamp, marked );
                     }
@@ -627,6 +645,13 @@ public class ShardedEdgeSerializationImpl implements 
ShardedEdgeSerialization {
 
 
                     @Override
+                    protected void setTimeScan( final RangeBuilder 
rangeBuilder ) {
+                        final ByteBuffer buffer = 
EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp );
+
+                        rangeBuilder.setStart( buffer );
+                    }
+
+                    @Override
                     protected MarkedEdge createEdge( final DirectedEdge edge, 
final boolean marked ) {
                         return new SimpleMarkedEdge( edge.id, type, targetId, 
edge.timestamp, marked );
                     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
index 590cf35..e652c73 100644
--- 
a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
+++ 
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java
@@ -35,14 +35,16 @@ import com.netflix.astyanax.serializers.LongSerializer;
 
 
 /**
- * Serializes to a source->target edge Note that we cannot set the edge type 
on de-serialization.  Only the target
- * Id and version.
+ * Serializes to a source->target edge Note that we cannot set the edge type 
on de-serialization.  Only the target Id
+ * and version.
  */
 public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
 
     private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = 
IdColDynamicCompositeSerializer.get();
     private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
 
+    public static final EdgeSerializer INSTANCE = new EdgeSerializer();
+
 
     @Override
     public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
@@ -74,4 +76,19 @@ public class EdgeSerializer extends 
AbstractSerializer<DirectedEdge> {
 
         return new DirectedEdge( id, timestamp );
     }
+
+
+    /**
+     * Create a scan range that represents the timestamp of the edge
+     * @param timestamp
+     * @return
+     */
+    public ByteBuffer fromTimeRange( final long timestamp ) {
+        DynamicComposite composite = new DynamicComposite();
+
+        composite.addComponent( timestamp, LONG_SERIALIZER );
+
+
+        return composite.serialize();
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 4b2ffc8..eaded15 100644
--- 
a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ 
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -2353,7 +2353,7 @@ public class GraphManagerIT {
 
 
         final SearchByEdge searchDescending =
-            new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), 
edge1.getTargetNode(),Long.MAX_VALUE,
+            new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), 
edge1.getTargetNode(), Long.MAX_VALUE,
                 SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()   );
 
         final Observable<Edge> edgesDescending = gm.loadEdgeVersions( 
searchDescending );
@@ -2373,7 +2373,7 @@ public class GraphManagerIT {
         //now search ascending
 
         final SearchByEdge searchAscending =
-                    new SimpleSearchByEdge( edge1.getSourceNode(), 
edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE,
+                    new SimpleSearchByEdge( edge1.getSourceNode(), 
edge1.getType(), edge1.getTargetNode(), 0,
                         SearchByEdgeType.Order.ASCENDING, 
Optional.<Edge>absent()   );
 
         Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending 
);

Reply via email to