Repository: usergrid Updated Branches: refs/heads/release-2.1.1 cd0b2e183 -> 26b325d44
Fix issue with cursor paging when using the smart shard iterator. Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/26b325d4 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/26b325d4 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/26b325d4 Branch: refs/heads/release-2.1.1 Commit: 26b325d44f400ceb89e268fb29ae9721503df9a7 Parents: cd0b2e1 Author: Michael Russo <[email protected]> Authored: Sat Mar 26 15:43:18 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Sat Mar 26 15:43:18 2016 -0700 ---------------------------------------------------------------------- .../read/traverse/AbstractReadGraphFilter.java | 18 ++++++------ .../usergrid/persistence/graph/GraphFig.java | 10 ++++++- .../impl/shard/impl/EdgeSearcher.java | 21 +++++++++++-- .../impl/ShardedEdgeSerializationImpl.java | 10 +++---- .../impl/shard/impl/ShardsColumnIterator.java | 31 +++++++++++++++----- 5 files changed, 66 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/26b325d4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index d7d6294..f2aed89 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -257,9 +257,9 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, } /** - * Return a key that Rx can use for determining a distinct edge. Build a string containing the hash code - * of the source, target, and type to ensure uniqueness rather than the int sum of the hash codes. Edge - * timestamp is specifically left out as edges with the same source,target,type but different timestamps + * Return a key that Rx can use for determining a distinct edge. Build a string containing the UUID + * of the source and target nodes, with the type to ensure uniqueness rather than the int sum of the hash codes. + * Edge timestamp is specifically left out as edges with the same source,target,type but different timestamps * are considered duplicates. */ private class EdgeDistinctKey implements Func1<Edge,String> { @@ -267,22 +267,22 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, @Override public String call(Edge edge) { - return buildDistinctKey(edge.getSourceNode().hashCode(), edge.getTargetNode().hashCode(), - edge.getType().hashCode()); + return buildDistinctKey(edge.getSourceNode().getUuid().toString(), edge.getTargetNode().getUuid().toString(), + edge.getType().toLowerCase()); } } - protected static String buildDistinctKey(final int sourceHash, final int targetHash, final int typeHash){ + protected static String buildDistinctKey(final String sourceNode, final String targetNode, final String type){ final String DISTINCT_KEY_SEPARATOR = ":"; StringBuilder stringBuilder = new StringBuilder(); stringBuilder - .append(sourceHash) + .append(sourceNode) .append(DISTINCT_KEY_SEPARATOR) - .append(targetHash) + .append(targetNode) .append(DISTINCT_KEY_SEPARATOR) - .append(typeHash); + .append(type); return stringBuilder.toString(); http://git-wip-us.apache.org/repos/asf/usergrid/blob/26b325d4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java index 7002125..efd94ed 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java @@ -96,6 +96,8 @@ public interface GraphFig extends GuicyFig { String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size"; + String SMART_SHARD_SEEK_ENABLED = "usergrid.graph.smartshard.seek.enabled"; + @@ -114,7 +116,7 @@ public interface GraphFig extends GuicyFig { double getShardRepairChance(); - @Default( "10000" ) + @Default( "50000" ) @Key( SHARD_SIZE ) long getShardSize(); @@ -163,5 +165,11 @@ public interface GraphFig extends GuicyFig { @Default("1000") @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE) int getCounterFlushQueueSize(); + + @Default("true") + @Key(SMART_SHARD_SEEK_ENABLED) + boolean getSmartShardSeekEnabled(); + + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/26b325d4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java index ed1c8e0..917e943 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java @@ -140,13 +140,30 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum @Override public void buildRange(final RangeBuilder rangeBuilder, final T start, T end) { + final boolean ascending = order == SearchByEdgeType.Order.ASCENDING; + + if ( start != null){ - C startEdge = createColumn( start ); - rangeBuilder.setStart( startEdge, getSerializer() ); + C sourceEdge = createColumn( start ); + + if(ascending && last.isPresent() && comparator.compare(last.get(), start) < 0){ + + sourceEdge = createColumn( last.get() ); + + }else if (!ascending && last.isPresent() && comparator.compare(last.get(), start) > 0){ + + sourceEdge = createColumn( last.get() ); + } + + rangeBuilder.setStart( sourceEdge, getSerializer() ); + + }else{ setTimeScan( rangeBuilder ); + + } if( end != null){ http://git-wip-us.apache.org/repos/asf/usergrid/blob/26b325d4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java index 9888eaa..65a6f40 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java @@ -399,7 +399,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { }; return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), - graphFig.getScanPageSize() ); + graphFig.getScanPageSize(), graphFig.getSmartShardSeekEnabled() ); } @@ -470,7 +470,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), - graphFig.getScanPageSize() ); + graphFig.getScanPageSize(), graphFig.getSmartShardSeekEnabled() ); } @@ -535,7 +535,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { }; return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), - graphFig.getScanPageSize() ); + graphFig.getScanPageSize(), graphFig.getSmartShardSeekEnabled() ); } @@ -597,7 +597,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), - graphFig.getScanPageSize() ); + graphFig.getScanPageSize(), graphFig.getSmartShardSeekEnabled() ); } @@ -660,7 +660,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization { }; return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(), - graphFig.getScanPageSize() ); + graphFig.getScanPageSize(), graphFig.getSmartShardSeekEnabled() ); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/26b325d4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java index 690c392..c0ab835 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java @@ -21,13 +21,13 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl; import java.util.*; +import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator; import org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator; import org.apache.usergrid.persistence.core.shard.SmartShard; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator; import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamily; import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey; @@ -60,15 +60,19 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { private final ConsistencyLevel consistencyLevel; + private final boolean smartShardSeekEnabled; + public ShardsColumnIterator(final EdgeSearcher<R, C, T> searcher, final MultiTenantColumnFamily<ScopedRowKey<R>, C> cf, final Keyspace keyspace, - final ConsistencyLevel consistencyLevel, final int pageSize ) { + final ConsistencyLevel consistencyLevel, final int pageSize, + final boolean smartShardSeekEnabled) { this.searcher = searcher; this.cf = cf; this.keyspace = keyspace; this.pageSize = pageSize; this.consistencyLevel = consistencyLevel; + this.smartShardSeekEnabled = smartShardSeekEnabled; } @@ -118,13 +122,26 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> { searcher.buildRange( rangeBuilder ); - // get the rows keys and their corresponding 'shardEnd' that we will seek from - final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd(); + if(smartShardSeekEnabled){ + + // get the rows keys and their corresponding 'shardEnd' that we will seek from + final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd(); + + final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING; + + currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, + searcher.getComparator(), pageSize, rowKeysWithShardEnd, ascending, searcher.getLastTimestamp() ); - final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING; + }else{ - currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, - searcher.getComparator(), pageSize, rowKeysWithShardEnd, ascending, searcher.getLastTimestamp() ); + + final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys(); + + currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, + searcher.getComparator(), rowKeys, pageSize ); + + + } }
