Fixes incorrect Cassandra range scanning in serialization with min/max values
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6233bf0b Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6233bf0b Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6233bf0b Branch: refs/heads/usergrid-1007-shiro-cache Commit: 6233bf0b23e6f86b0d57607d2dd423599c27e726 Parents: 13d2594 Author: Todd Nine <[email protected]> Authored: Tue Sep 15 17:20:24 2015 -0600 Committer: Todd Nine <[email protected]> Committed: Tue Sep 15 17:20:24 2015 -0600 ---------------------------------------------------------------------- .../service/ConnectionServiceImpl.java | 30 +++++----- .../service/ConnectionServiceImplTest.java | 19 ++++--- .../core/astyanax/MultiRowColumnIterator.java | 2 +- .../graph/impl/GraphManagerImpl.java | 4 +- .../impl/shard/impl/EdgeSearcher.java | 14 ++++- .../impl/ShardedEdgeSerializationImpl.java | 59 ++++++++++++++------ .../shard/impl/serialize/EdgeSerializer.java | 21 ++++++- .../persistence/graph/GraphManagerIT.java | 4 +- 8 files changed, 105 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java index d60426c..1b36321 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImpl.java @@ -170,7 +170,7 @@ public class ConnectionServiceImpl implements ConnectionService { final SearchByEdgeType searchByEdge = new SimpleSearchByEdgeType( entityId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.ASCENDING, Optional.absent() ); + SearchByEdgeType.Order.DESCENDING, Optional.absent() ); //load edges from the source the with type specified return gm.loadEdgesFromSource( searchByEdge ); @@ -182,21 +182,25 @@ public class ConnectionServiceImpl implements ConnectionService { logger.debug( "Found edge {}, searching for multiple versions of edge", edge ); final SearchByEdge searchByEdge = - new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), - Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, Optional.absent() ); - return gm.loadEdgeVersions( searchByEdge ); - } ) + new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), 0, + SearchByEdgeType.Order.ASCENDING, Optional.absent() ); + return gm.loadEdgeVersions( searchByEdge ) + //skip the first version since it's the one we want to retain + .skip( 1 ) + //mark for deletion + .flatMap( edgeToDelete -> { - //skip the first version since it's the one we want to retain - // validate there is only 1 version of it, delete anything > than the min - .skip( 1 ).flatMap( edgeToDelete -> { + logger.debug( "Deleting edge {}", edgeToDelete ); - logger.debug( "Deleting edge {}", edgeToDelete ); + //mark the edge and ignore the cleanup result + return gm.markEdge( edgeToDelete ); + } ) + //mark all versions, then on the last, delete them from cass + .flatMap( lastMarkedEdge -> gm.deleteEdge( lastMarkedEdge ) ); + + + // validate there is only 1 version of it, delete anything > than the min - //mark the edge and ignore the cleanup result - return gm.markEdge( edgeToDelete ) - //delete the edge - .flatMap( edge -> gm.deleteEdge( edgeToDelete ) ); } ).map( deletedEdge -> new ConnectionScope( applicationScope, deletedEdge ) ); } ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java index 152721a..a2c18b1 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java @@ -49,6 +49,7 @@ import rx.Observable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; @RunWith( ITRunner.class ) @@ -179,19 +180,20 @@ public class ConnectionServiceImplTest { final Edge written3 = gm.writeEdge( connection3 ).toBlocking().last(); + logger.info( "Wrote edge 3 with edge {}", written3 ); + + assertTrue( "Expected edge timestamp to be in order", written1.getTimestamp() <= written2.getTimestamp() ); + assertTrue( "Expected edge timestamp to be in order", written2.getTimestamp() <= written3.getTimestamp() ); + //now run the cleanup final List<ConnectionScope> deletedConnections = connectionService.deDupeConnections( Observable.just( applicationScope ) ).toList().toBlocking().last(); -// assertEquals( "2 edges deleted", 2, deletedConnections.size() ); - //check our oldest was deleted first - - assertEdgeData( written2, deletedConnections.get( 0 ).getEdge() ); assertEdgeData( written3, deletedConnections.get( 1 ).getEdge() ); @@ -221,12 +223,13 @@ public class ConnectionServiceImplTest { * @param asserted */ private void assertEdgeData(final Edge expected, final Edge asserted){ - assertEquals("SourceId the same", expected.getSourceNode(), expected.getTargetNode()); - assertEquals("TargetId the same", expected.getTargetNode(), expected.getTargetNode()); + assertEquals("SourceId the same", expected.getSourceNode(), asserted.getSourceNode()); + + assertEquals("TargetId the same", expected.getTargetNode(), asserted.getTargetNode()); - assertEquals("Type the same", expected.getType(), expected.getType()); + assertEquals("Type the same", expected.getType(), asserted.getType()); - assertEquals("Timestamp the same", expected.getTimestamp(), expected.getTimestamp()); + assertEquals("Timestamp the same", expected.getTimestamp(), asserted.getTimestamp()); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java index 667992c..16249ae 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java @@ -201,7 +201,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> { final List<T> mergedResults; if ( containsSingleRowOnly( result ) ) { - mergedResults = singleRowResult( result ); + mergedResults = singleRowResult( result ); } else { mergedResults = mergeResults( result, selectSize ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index 8c64fa1..e119c59 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -217,7 +217,7 @@ public class GraphManagerImpl implements GraphManager { protected Iterator<MarkedEdge> getIterator() { return storageEdgeSerialization.getEdgeVersions( scope, new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), - Long.MAX_VALUE, SearchByEdgeType.Order.ASCENDING, Optional.absent() ) ); + Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); } } ).filter( markedEdge -> markedEdge.isDeleted() ).flatMap( marked -> //fire our delete listener and wait for the results @@ -458,7 +458,7 @@ public class GraphManagerImpl implements GraphManager { final long edgeTimestamp = edge.getTimestamp(); //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxVersion ) > 0 ) { + if ( edge.isDeleted() ) { return false; } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index 8bcefbb..4d02ba9 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -1,6 +1,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -13,6 +14,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge; import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard; import com.google.common.base.Optional; @@ -107,12 +109,12 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum if ( last.isPresent() ) { C sourceEdge = createColumn( last.get() ); - rangeBuilder.setStart( sourceEdge, getSerializer() ); + }else { + setTimeScan( rangeBuilder ); } - - setRangeOptions(rangeBuilder); + setRangeOptions( rangeBuilder ); } @@ -122,6 +124,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum final boolean reversed = order == SearchByEdgeType.Order.ASCENDING; rangeBuilder.setReversed( reversed ); + } @@ -153,6 +156,11 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum */ protected abstract C createColumn( final T last ); + /** + * Set the time scan into the range builder + * @param rangeBuilder + */ + protected abstract void setTimeScan(final RangeBuilder rangeBuilder); /** * Create an edge to return to the user based on the directed edge provided http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index 13f7427..2ef50f5 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; +import java.nio.ByteBuffer; import java.util.Collection; -import java.util.Comparator; import java.util.Iterator; import java.util.UUID; @@ -54,14 +54,14 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdg import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators - .SourceDirectedEdgeDescendingComparator; + .SourceDirectedEdgeDescendingComparator; import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators - .TargetDirectedEdgeDescendingComparator; + .TargetDirectedEdgeDescendingComparator; +import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeSerializer; import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation; import org.apache.usergrid.persistence.model.entity.Id; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.inject.Singleton; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; @@ -388,19 +388,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { } - @Override - public void buildRange( final RangeBuilder builder ) { - - - if ( last.isPresent() ) { - super.buildRange( builder ); - return; - } - - //start seeking at a value < our max version - builder.setStart( maxTimestamp ); - } - @Override protected EdgeRowKey generateRowKey( long shard ) { @@ -415,6 +402,13 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override + protected void setTimeScan( final RangeBuilder rangeBuilder ) { + //start seeking at a value < our max version + rangeBuilder.setStart( maxTimestamp ); + } + + + @Override protected MarkedEdge createEdge( final Long column, final boolean marked ) { return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked ); } @@ -472,6 +466,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override + protected void setTimeScan( final RangeBuilder rangeBuilder ) { + final ByteBuffer buffer = EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp ); + + rangeBuilder.setStart( buffer ); + } + + + @Override protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) { return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked ); } @@ -526,6 +528,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override + protected void setTimeScan( final RangeBuilder rangeBuilder ) { + final ByteBuffer buffer = EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp ); + + rangeBuilder.setStart( buffer ); + } + + + @Override protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) { return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked ); } @@ -574,6 +584,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override + protected void setTimeScan( final RangeBuilder rangeBuilder ) { + final ByteBuffer buffer = EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp ); + + rangeBuilder.setStart( buffer ); + } + + + @Override protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) { return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked ); } @@ -627,6 +645,13 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { @Override + protected void setTimeScan( final RangeBuilder rangeBuilder ) { + final ByteBuffer buffer = EdgeSerializer.INSTANCE.fromTimeRange( maxTimestamp ); + + rangeBuilder.setStart( buffer ); + } + + @Override protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) { return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java index 590cf35..e652c73 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/EdgeSerializer.java @@ -35,14 +35,16 @@ import com.netflix.astyanax.serializers.LongSerializer; /** - * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target - * Id and version. + * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target Id + * and version. */ public class EdgeSerializer extends AbstractSerializer<DirectedEdge> { private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get(); private static final LongSerializer LONG_SERIALIZER = LongSerializer.get(); + public static final EdgeSerializer INSTANCE = new EdgeSerializer(); + @Override public ByteBuffer toByteBuffer( final DirectedEdge edge ) { @@ -74,4 +76,19 @@ public class EdgeSerializer extends AbstractSerializer<DirectedEdge> { return new DirectedEdge( id, timestamp ); } + + + /** + * Create a scan range that represents the timestamp of the edge + * @param timestamp + * @return + */ + public ByteBuffer fromTimeRange( final long timestamp ) { + DynamicComposite composite = new DynamicComposite(); + + composite.addComponent( timestamp, LONG_SERIALIZER ); + + + return composite.serialize(); + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/6233bf0b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index 4b2ffc8..eaded15 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -2353,7 +2353,7 @@ public class GraphManagerIT { final SearchByEdge searchDescending = - new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE, + new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ); final Observable<Edge> edgesDescending = gm.loadEdgeVersions( searchDescending ); @@ -2373,7 +2373,7 @@ public class GraphManagerIT { //now search ascending final SearchByEdge searchAscending = - new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(),Long.MAX_VALUE, + new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0, SearchByEdgeType.Order.ASCENDING, Optional.<Edge>absent() ); Observable<Edge> edgesAscending = gm.loadEdgeVersions( searchAscending );
