Updated Branches: refs/heads/trunk c3fdb6cc9 -> ef2333590
handle maxIsColumns for parallel range fetch Patch by Vijay, reviewed by David Alves for CASSANDRA-1337 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef233359 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef233359 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef233359 Branch: refs/heads/trunk Commit: ef2333590787e9b72ad08ff8a2abbfacf9382d9b Parents: c3fdb6c Author: Vijay Parthasarathy <[email protected]> Authored: Sat Jul 28 22:10:47 2012 -0700 Committer: Vijay Parthasarathy <[email protected]> Committed: Sat Jul 28 22:10:47 2012 -0700 ---------------------------------------------------------------------- .../org/apache/cassandra/service/StorageProxy.java | 30 ++++++++------- 1 files changed, 16 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef233359/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index e033962..8d0e0b3 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -856,19 +856,20 @@ public class StorageProxy implements StorageProxyMBean // get the cardinality of this index based on row count // use this info to decide how many scans to do in parallel - long estimatedKeys = Table.open(command.keyspace).getColumnFamilyStore(command.column_family) - .estimateKeys(); - int concurrencyFactor = (int) command.maxResults / ((int) estimatedKeys + 1); + Table table = Table.open(command.keyspace); + long estimatedKeysPerRange = table.getColumnFamilyStore(command.column_family) + .estimateKeys() / table.getReplicationStrategy().getReplicationFactor(); - if (concurrencyFactor <= 0) + int concurrencyFactor = (int) (command.maxResults / (estimatedKeysPerRange + 1)); + if (concurrencyFactor <= 0 || command.maxIsColumns) concurrencyFactor = 1; - - if (concurrencyFactor > ranges.size()) + else if (concurrencyFactor > ranges.size()) concurrencyFactor = ranges.size(); - + // parallel scan handlers List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor); - + + int parallelHandlers = concurrencyFactor; for (AbstractBounds<RowPosition> range : ranges) { RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace, @@ -903,6 +904,7 @@ public class StorageProxy implements StorageProxyMBean { throw new AssertionError(e); } + parallelHandlers--; } else { @@ -920,7 +922,8 @@ public class StorageProxy implements StorageProxyMBean } scanHandlers.add(handler); - if (scanHandlers.size() >= concurrencyFactor) + + if (scanHandlers.size() >= parallelHandlers) { for (ReadCallback<RangeSliceReply, Iterable<Row>> scanHandler : scanHandlers) { @@ -944,15 +947,14 @@ public class StorageProxy implements StorageProxyMBean { throw new AssertionError(e); // no digests in range slices yet } - - // if we're done, great, otherwise, move to the next range - int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); - if (count >= nodeCmd.maxResults) - break; } scanHandlers.clear(); //go back for more } } + // if we're done, great, otherwise, move to the next range + int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); + if (count >= nodeCmd.maxResults) + break; } } finally
