Repository: usergrid Updated Branches: refs/heads/release-2.1.1 879cdeffc -> cd0b2e183
Fix graph cursor paging with new shard iterator. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/cd0b2e18 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/cd0b2e18 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/cd0b2e18 Branch: refs/heads/release-2.1.1 Commit: cd0b2e183bf43db9483f3842a2ab5d36f63bf933 Parents: 879cdef Author: Michael Russo <[email protected]> Authored: Fri Mar 25 14:57:22 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Fri Mar 25 14:57:22 2016 -0700 ---------------------------------------------------------------------- .../astyanax/MultiRowShardColumnIterator.java | 36 +++++++++- .../persistence/core/shard/SmartShard.java | 11 ++- .../usergrid/persistence/graph/GraphFig.java | 2 +- .../impl/shard/impl/EdgeSearcher.java | 12 +++- .../impl/ShardedEdgeSerializationImpl.java | 32 +++++++-- .../impl/shard/impl/ShardsColumnIterator.java | 2 +- .../collection/paging/PagingResourceIT.java | 70 +++++++++++++++++--- .../resources/usergrid-custom-test.properties | 4 ++ 8 files changed, 148 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java index 86e3b4d..f3e8d4c 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java @@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.core.astyanax; import java.util.*; +import com.google.common.base.Optional; import org.apache.usergrid.persistence.core.shard.SmartShard; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,12 +80,14 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { private boolean ascending = false; + private Optional<Long> lastTimestamp; + public MultiRowShardColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf, final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser, final ColumnSearch<T> columnSearch, final Comparator<T> comparator, final int pageSize, final List<SmartShard> rowKeysWithShardEnd, - final boolean ascending) { + final boolean ascending, final Optional<Long> lastTimestamp) { this.cf = cf; this.pageSize = pageSize; this.columnParser = columnParser; @@ -96,6 +99,8 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { this.rowKeysWithShardEnd = rowKeysWithShardEnd; this.resultsTracking = new ArrayList<>(); this.ascending = ascending; + this.lastTimestamp = lastTimestamp; + } @@ -130,7 +135,6 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { advance(); } - return currentColumnIterator.hasNext(); } @@ -168,16 +172,37 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { final RangeBuilder rangeBuilder = new RangeBuilder(); + SmartShard startShard = null; if(currentShardIterator == null){ + // create a copy that we use to search for our 'starting shard' + final List<SmartShard> shards = new ArrayList<>(rowKeysWithShardEnd); + + // flip the order of our shards if ascending if(ascending){ Collections.reverse(rowKeysWithShardEnd); } + + if(lastTimestamp.isPresent()) { + + //always seek from 0 to find out where our cursor last should fall + Collections.reverse(shards); + + for ( SmartShard shard : shards){ + + if ( lastTimestamp.get().compareTo(shard.getShardIndex()) > 0) { + startShard = shard; + } + + } + + } + currentShardIterator = rowKeysWithShardEnd.iterator(); } @@ -190,6 +215,13 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> { currentShard = currentShardIterator.next(); + if (startShard != null){ + while(!currentShard.equals(startShard)){ + currentShard = currentShardIterator.next(); + } + } + + if(logger.isTraceEnabled()){ logger.trace("all shards when starting: {}", rowKeysWithShardEnd); logger.trace("initializing iterator with shard: {}", currentShard); http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java index b60cb59..39ddc35 100644 --- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java +++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java @@ -24,11 +24,13 @@ public class SmartShard<R, T> { final ScopedRowKey<R> rowKey; final T shardEnd; + final long shardIndex; - public SmartShard(final ScopedRowKey<R> rowKey, final T shardEnd){ + public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd){ this.rowKey = rowKey; + this.shardIndex = shardIndex; this.shardEnd = shardEnd; } @@ -41,6 +43,13 @@ public class SmartShard<R, T> { return shardEnd; } + public long getShardIndex(){ + + return shardIndex; + } + + + @Override public String toString(){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 573e911..7002125 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -114,7 +114,7 @@ public interface GraphFig extends GuicyFig { double getShardRepairChance(); - @Default( "50000" ) + @Default( "10000" ) @Key( SHARD_SIZE ) long getShardSize(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index 6d8ddd4..ed1c8e0 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -52,6 +52,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum protected final Optional<T> last; + protected final Optional<Long> lastTimestamp; protected final long maxTimestamp; protected final ApplicationScope scope; protected final Collection<Shard> shards; @@ -59,7 +60,9 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum protected final Comparator<T> comparator; - protected EdgeSearcher( final ApplicationScope scope, final Collection<Shard> shards, final SearchByEdgeType.Order order, final Comparator<T> comparator, final long maxTimestamp, final Optional<T> last) { + protected EdgeSearcher( final ApplicationScope scope, final Collection<Shard> shards, + final SearchByEdgeType.Order order, final Comparator<T> comparator, + final long maxTimestamp, final Optional<T> last, final Optional<Long> lastTimestamp) { Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards"); @@ -68,6 +71,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum this.order = order; this.shards = shards; this.last = last; + this.lastTimestamp = lastTimestamp; this.comparator = comparator; } @@ -107,7 +111,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum shardEnd = null; } - rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd)); + rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd)); } return rowKeysWithShardEnd; @@ -194,6 +198,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum return order; } + public Optional<Long> getLastTimestamp() { + return lastTimestamp; + } + /** * Get the column's serializer http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index c7028aa..9888eaa 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -29,6 +29,7 @@ import java.util.UUID; import javax.annotation.Nullable; import javax.inject.Inject; +import com.google.common.base.Optional; import org.apache.usergrid.persistence.core.astyanax.CassandraConfig; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; @@ -350,12 +351,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder()); + Optional<Long> lastTimestamp = Optional.absent(); + if(search.last().isPresent()){ + lastTimestamp = Optional.of(search.last().get().getTimestamp()); + } final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher = new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp, - search.last().transform( TRANSFORM ) ) { + search.last().transform( TRANSFORM ), lastTimestamp ) { @Override @@ -420,11 +425,15 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + Optional<Long> lastTimestamp = Optional.absent(); + if(search.last().isPresent()){ + lastTimestamp = Optional.of(search.last().get().getTimestamp()); + } final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher = new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp, - search.last().transform( TRANSFORM ) ) { + search.last().transform( TRANSFORM ), lastTimestamp ) { @Override @@ -484,10 +493,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + Optional<Long> lastTimestamp = Optional.absent(); + if(search.last().isPresent()){ + lastTimestamp = Optional.of(search.last().get().getTimestamp()); + } final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher = new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp, - search.last().transform( TRANSFORM ) ) { + search.last().transform( TRANSFORM ), lastTimestamp ) { @Override protected Serializer<DirectedEdge> getSerializer() { @@ -541,9 +554,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + Optional<Long> lastTimestamp = Optional.absent(); + if(search.last().isPresent()){ + lastTimestamp = Optional.of(search.last().get().getTimestamp()); + } + final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher = new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator, maxTimestamp, - search.last().transform( TRANSFORM ) ) { + search.last().transform( TRANSFORM ), lastTimestamp ) { @Override protected Serializer<DirectedEdge> getSerializer() { @@ -602,10 +620,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder()); + Optional<Long> lastTimestamp = Optional.absent(); + if(search.last().isPresent()){ + lastTimestamp = Optional.of(search.last().get().getTimestamp()); + } final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher = new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp, - search.last().transform( TRANSFORM ) ) { + search.last().transform( TRANSFORM ), lastTimestamp ) { @Override protected Serializer<DirectedEdge> getSerializer() { return serializer; http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index 0c90103..690c392 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -124,7 +124,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING; currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, - searcher.getComparator(), pageSize, rowKeysWithShardEnd, ascending); + searcher.getComparator(), pageSize, rowKeysWithShardEnd, ascending, searcher.getLastTimestamp() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java index e3ec07c..3c30571 100644 --- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java +++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/paging/PagingResourceIT.java @@ -20,7 +20,6 @@ package org.apache.usergrid.rest.applications.collection.paging; import java.io.IOException; import java.util.*; -import org.junit.Ignore; import org.junit.Test; @@ -29,6 +28,8 @@ import org.apache.usergrid.rest.test.resource.model.ApiResponse; import org.apache.usergrid.rest.test.resource.model.Collection; import org.apache.usergrid.rest.test.resource.model.Entity; import org.apache.usergrid.rest.test.resource.model.QueryParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; @@ -38,6 +39,9 @@ import static org.junit.Assert.*; public class PagingResourceIT extends AbstractRestIT { + private static final Logger logger = LoggerFactory.getLogger(PagingResourceIT.class); + + /** * Creates 40 objects and then creates a query to delete sets of 10 entities per call. Checks at the end * to make sure there are no entities remaining. @@ -155,7 +159,7 @@ public class PagingResourceIT extends AbstractRestIT { parameters.setKeyValue( "limit", "" ); //sends GET call using empty parameters - pageAndVerifyEntities( collectionName,parameters, numOfPages, numOfEntities ); + pageAndVerifyEntities( collectionName,parameters, numOfPages, numOfEntities, "asc"); } @@ -182,7 +186,7 @@ public class PagingResourceIT extends AbstractRestIT { //pages through entities and verifies that they are correct. QueryParameters queryParameters = new QueryParameters(); queryParameters.setQuery( "select * ORDER BY created" ); - pageAndVerifyEntities( collectionName,queryParameters,numOfPages, numOfEntities ); + pageAndVerifyEntities( collectionName,queryParameters,numOfPages, numOfEntities, "asc"); //Create new collection of only 5 entities String trinketCollectionName = "trinkets" ; @@ -194,7 +198,7 @@ public class PagingResourceIT extends AbstractRestIT { //Created a new query parameter because when generated it store the cursor token back into it. queryParameters = new QueryParameters(); queryParameters.setQuery( "select * ORDER BY created" ); - pageAndVerifyEntities( trinketCollectionName,queryParameters,numOfPages, numOfEntities ); + pageAndVerifyEntities( trinketCollectionName,queryParameters,numOfPages, numOfEntities, "asc"); } @@ -215,7 +219,31 @@ public class PagingResourceIT extends AbstractRestIT { //pages through entities and verifies that they are correct. QueryParameters queryParameters = new QueryParameters(); queryParameters.setQuery( "select * ORDER BY created" ); - pageAndVerifyEntities(collectionName, queryParameters, numOfPages, numOfEntities); + pageAndVerifyEntities(collectionName, queryParameters, numOfPages, numOfEntities, "asc"); + } + + @Test + public void pagingEntitiesAcrossShardsWithGraph() throws IOException { + + + int numOfEntities = 2000; + int limit = 5; + int numOfPages = numOfEntities/limit; + + String collectionName = "testPagingEntities" ; + + //creates entities + createEntities(collectionName, numOfEntities); + + //pages through entities and verifies that they are correct. + QueryParameters queryParameters = new QueryParameters(); + queryParameters.setQuery( "select *" ); + queryParameters.setLimit(limit); + + // page the same stuff multiple times + logger.info("Paging {} entities with page size of {}", numOfEntities, limit); + pageAndVerifyEntities(collectionName, queryParameters, numOfPages, numOfEntities, "desc"); + } @@ -253,7 +281,7 @@ public class PagingResourceIT extends AbstractRestIT { qp.setQuery("select * order by created asc"); qp.setLimit(10); - pageAndVerifyEntities(collectionName, qp, numOfPages, numOfEntities); + pageAndVerifyEntities(collectionName, qp, numOfPages, numOfEntities, "asc"); } @@ -321,27 +349,40 @@ public class PagingResourceIT extends AbstractRestIT { * creation from the createEntities method. * @param collectionName * @param numOfPages + * @param order * @return */ - public Collection pageAndVerifyEntities(String collectionName,QueryParameters queryParameters, int numOfPages, int numOfEntities ){ + public Collection pageAndVerifyEntities(String collectionName, QueryParameters queryParameters, int numOfPages, int numOfEntities, String order){ //Get the entities that exist in the collection Collection testCollections = this.app().collection( collectionName ).get(queryParameters); //checks to make sure we can page through all entities in order. //Used as an index to see what value we're on and also used to keep track of the current index of the entity. + int entityIndex = 1; + if(order.equals("desc")){ + entityIndex = numOfEntities; + } int pageIndex = 0; //Counts all the entities in pages with cursors while(testCollections.getCursor()!=null){ + //logger.info("cursor: {}", testCollections.getCursor()); //page through returned entities. while ( testCollections.hasNext() ) { Entity returnedEntity = testCollections.next(); //verifies that the names are in order, named string values will always +1 of the current index assertEquals( String.valueOf( entityIndex ), returnedEntity.get( "name" ) ); - entityIndex++; + + if(order.equals("desc")){ + entityIndex--; + }else{ + entityIndex++; + } + + } testCollections = this.app().collection( collectionName ).getNextPage( testCollections, queryParameters, true ); @@ -366,7 +407,14 @@ public class PagingResourceIT extends AbstractRestIT { //added in a minus one to account for the adding the additional 1 above. - assertEquals( numOfEntities, entityIndex-1 ); + + if(order.equals("desc")){ + assertEquals( 0, entityIndex ); + }else{ + assertEquals( numOfEntities, entityIndex - 1 ); + } + + assertEquals( numOfPages, pageIndex ); return testCollections; } @@ -398,6 +446,10 @@ public class PagingResourceIT extends AbstractRestIT { entities.add( entity ); this.app().collection( collectionName ).post( entity ); + + if ( i % 100 == 0){ + logger.info("created {} entities", i); + } } this.refreshIndex(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/cd0b2e18/stack/rest/src/test/resources/usergrid-custom-test.properties ---------------------------------------------------------------------- diff --git a/stack/rest/src/test/resources/usergrid-custom-test.properties b/stack/rest/src/test/resources/usergrid-custom-test.properties index 2605c28..c485bd5 100644 --- a/stack/rest/src/test/resources/usergrid-custom-test.properties +++ b/stack/rest/src/test/resources/usergrid-custom-test.properties @@ -48,3 +48,7 @@ elasticsearch.managment_index=usergrid_rest_management # This property is required to be set and cannot be defaulted anywhere usergrid.cluster_name=usergrid + +# this is here for PagingResourceIT.pagingEntitiesAcrossShardsWithGraph +usergrid.graph.shard.size=100 +usergrid.graph.shard.repair.chance=1.0
