Updated Branches:
  refs/heads/trunk c3fdb6cc9 -> ef2333590

handle maxIsColumns for parallel range fetch
Patch by Vijay, reviewed by David Alves for CASSANDRA-1337


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef233359
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef233359
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef233359

Branch: refs/heads/trunk
Commit: ef2333590787e9b72ad08ff8a2abbfacf9382d9b
Parents: c3fdb6c
Author: Vijay Parthasarathy <[email protected]>
Authored: Sat Jul 28 22:10:47 2012 -0700
Committer: Vijay Parthasarathy <[email protected]>
Committed: Sat Jul 28 22:10:47 2012 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/service/StorageProxy.java |   30 ++++++++-------
 1 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef233359/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index e033962..8d0e0b3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -856,19 +856,20 @@ public class StorageProxy implements StorageProxyMBean
             
             // get the cardinality of this index based on row count
             // use this info to decide how many scans to do in parallel
-            long estimatedKeys = 
Table.open(command.keyspace).getColumnFamilyStore(command.column_family)
-                    .estimateKeys();
-            int concurrencyFactor = (int) command.maxResults / ((int) 
estimatedKeys + 1);
+            Table table = Table.open(command.keyspace);
+            long estimatedKeysPerRange = 
table.getColumnFamilyStore(command.column_family)
+                    .estimateKeys() / 
table.getReplicationStrategy().getReplicationFactor();
 
-            if (concurrencyFactor <= 0)
+            int concurrencyFactor = (int) (command.maxResults / 
(estimatedKeysPerRange + 1));
+            if (concurrencyFactor <= 0 || command.maxIsColumns)
                 concurrencyFactor = 1;
-
-            if (concurrencyFactor > ranges.size())
+            else if (concurrencyFactor > ranges.size())
                 concurrencyFactor = ranges.size();
-            
+
             // parallel scan handlers
             List<ReadCallback<RangeSliceReply, Iterable<Row>>> scanHandlers = 
new ArrayList<ReadCallback<RangeSliceReply, Iterable<Row>>>(concurrencyFactor);
-            
+
+            int parallelHandlers = concurrencyFactor;
             for (AbstractBounds<RowPosition> range : ranges)
             {
                 RangeSliceCommand nodeCmd = new 
RangeSliceCommand(command.keyspace,
@@ -903,6 +904,7 @@ public class StorageProxy implements StorageProxyMBean
                     {
                         throw new AssertionError(e);
                     }
+                    parallelHandlers--;
                 }
                 else
                 {
@@ -920,7 +922,8 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     scanHandlers.add(handler);
-                    if (scanHandlers.size() >= concurrencyFactor)
+
+                    if (scanHandlers.size() >= parallelHandlers)
                     {
                         for (ReadCallback<RangeSliceReply, Iterable<Row>> 
scanHandler : scanHandlers)
                         {
@@ -944,15 +947,14 @@ public class StorageProxy implements StorageProxyMBean
                             {
                                 throw new AssertionError(e); // no digests in 
range slices yet
                             }
-
-                            // if we're done, great, otherwise, move to the 
next range
-                            int count = nodeCmd.maxIsColumns ? columnsCount : 
rows.size();
-                            if (count >= nodeCmd.maxResults)
-                                break;
                         }
                         scanHandlers.clear(); //go back for more
                     }
                 }
+                // if we're done, great, otherwise, move to the next range
+                int count = nodeCmd.maxIsColumns ? columnsCount : rows.size();
+                if (count >= nodeCmd.maxResults)
+                    break;
             }
         }
         finally

Reply via email to