Author: jbellis
Date: Thu Aug  6 15:34:22 2009
New Revision: 801676

URL: http://svn.apache.org/viewvc?rev=801676&view=rev
Log:
fix range query buglet; add debug logging
patch by jbellis; tested by Mark Robson for CASSANDRA-348

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java 
Thu Aug  6 15:34:22 2009
@@ -21,6 +21,8 @@
 import java.util.*;
 import java.io.IOException;
 
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
@@ -50,6 +52,15 @@
         return 
originalMessage.getReply(StorageService.getLocalStorageEndPoint(), data);
     }
 
+    @Override
+    public String toString()
+    {
+        return "RangeReply(" +
+               "keys=[" + StringUtils.join(keys, ", ") +
+               "], completed=" + rangeCompletedLocally +
+               ')';
+    }
+
     public static RangeReply read(byte[] body) throws IOException
     {
         DataInputBuffer bufIn = new DataInputBuffer();

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 Thu Aug  6 15:34:22 2009
@@ -550,7 +550,14 @@
             throw new InvalidRequestException("maxResults must be positive");
         }
 
-        return StorageProxy.getKeyRange(new RangeCommand(tablename, 
columnFamily, startWith, stopAt, maxResults));
+        try
+        {
+            return StorageProxy.getKeyRange(new RangeCommand(tablename, 
columnFamily, startWith, stopAt, maxResults));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     // main method moved to CassandraDaemon

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
 Thu Aug  6 15:34:22 2009
@@ -18,6 +18,8 @@
 */
 package org.apache.cassandra.service;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.db.RangeCommand;
 import org.apache.cassandra.db.RangeReply;
 import org.apache.cassandra.db.Table;
@@ -27,16 +29,19 @@
 
 public class RangeVerbHandler implements IVerbHandler
 {
+    private static final Logger logger = 
Logger.getLogger(RangeVerbHandler.class);
+
     public void doVerb(Message message)
     {
-        RangeReply rangeReply;
         try
         {
             RangeCommand command = RangeCommand.read(message);
             Table table = Table.open(command.table);
 
-            rangeReply = table.getKeyRange(command.columnFamily, 
command.startWith, command.stopAt, command.maxResults);
+            RangeReply rangeReply = table.getKeyRange(command.columnFamily, 
command.startWith, command.stopAt, command.maxResults);
             Message response = rangeReply.getReply(message);
+            if (logger.isDebugEnabled())
+                logger.debug("Sending " + rangeReply + " to " + 
message.getMessageId() + "@" + message.getFrom());
             MessagingService.getMessagingInstance().sendOneWay(response, 
message.getFrom());
         }
         catch (Exception e)

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu Aug  6 15:34:22 2009
@@ -359,14 +359,6 @@
 
     /**
      * This is a multiget version of the above method.
-     * @param tablename
-     * @param keys
-     * @param columnFamily
-     * @param start
-     * @param count
-     * @return
-     * @throws IOException
-     * @throws TimeoutException
      */
     public static Map<String, Row> strongReadProtocol(String[] keys, 
ReadCommand readCommand) throws IOException, TimeoutException
     {       
@@ -571,10 +563,6 @@
      * This version is used when results for multiple keys needs to be
      * retrieved.
      * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param columns the columns we are interested in
      * @return a mapping of key --> Row
      * @throws Exception
      */
@@ -631,64 +619,61 @@
         return row;
     }
 
-    static List<String> getKeyRange(RangeCommand command)
+    static List<String> getKeyRange(RangeCommand command) throws IOException
     {
         long startTime = System.currentTimeMillis();
         int endpointOffset = 0;
         List<String> allKeys = new ArrayList<String>();
         int maxResults = command.maxResults;
 
-        try
+        EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith, 
endpointOffset);
+        String firstEndpoint = endPoint.toString();
+
+        do
         {
-            EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith, 
endpointOffset);
-            String firstEndpoint = endPoint.toString();
+            Message message = command.getMessage();
+            if (logger.isDebugEnabled())
+                logger.debug("reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
+            IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, endPoint);
 
-            do
+            // read response
+            byte[] responseBody = new byte[0];
+            try
             {
-                IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
-
-                // read response
-                byte[] responseBody = 
iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-                RangeReply rangeReply = RangeReply.read(responseBody);
-                List<String> rangeKeys = rangeReply.keys;
-
-                // deal with key overlaps
-                if (allKeys.size() > 0 && rangeKeys != null && 
rangeKeys.size() > 0 && allKeys.get(allKeys.size() - 
1).equals(rangeKeys.get(0)))
-                {
-                    allKeys.remove(allKeys.size() - 1);
-                    allKeys.addAll(rangeKeys);
-                }
-                else if (rangeKeys != null && rangeKeys.size() > 0)
-                {
-                    allKeys.addAll(rangeKeys);
-                }
+                responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+            }
+            catch (TimeoutException e)
+            {
+                throw new RuntimeException(e);
+            }
+            RangeReply rangeReply = RangeReply.read(responseBody);
+            List<String> rangeKeys = rangeReply.keys;
 
-                if (allKeys.size() >= maxResults || 
rangeReply.rangeCompletedLocally)
-                {
-                    break;
-                }
+            // deal with key overlaps
+            if (allKeys.size() > 0 && rangeKeys != null && rangeKeys.size() > 
0 && allKeys.get(allKeys.size() - 1).equals(rangeKeys.get(0)))
+            {
+                allKeys.remove(allKeys.size() - 1);
+                allKeys.addAll(rangeKeys);
+            }
+            else if (rangeKeys != null && rangeKeys.size() > 0)
+            {
+                allKeys.addAll(rangeKeys);
+            }
 
-                String newStartAt = (allKeys.size() > 0) ? 
allKeys.get(allKeys.size() - 1) : command.stopAt;
+            if (allKeys.size() >= maxResults || 
rangeReply.rangeCompletedLocally)
+            {
+                break;
+            }
 
-                command = new RangeCommand(command.table, command.columnFamily,
-                                           newStartAt, command.stopAt,
-                                           command.maxResults - 
rangeKeys.size());
-
-                endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith, 
++endpointOffset);
-            } while (!endPoint.toString().equals(firstEndpoint));
-
-            return (allKeys.size() > maxResults)
-                   ? allKeys.subList(0, maxResults)
-                   : allKeys;
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException("error reading keyrange " + command, e);
-        }
-        finally
-        {
-            rangeStats.add(System.currentTimeMillis() - startTime);
-        }
+            String newStartWith = (allKeys.size() > 0) ? 
allKeys.get(allKeys.size() - 1) : command.startWith;
+            command = new RangeCommand(command.table, command.columnFamily, 
newStartWith, command.stopAt, command.maxResults - allKeys.size());
+            endPoint = 
StorageService.instance().findSuitableEndPoint(command.startWith, 
++endpointOffset);
+        } while (!endPoint.toString().equals(firstEndpoint));
+
+        rangeStats.add(System.currentTimeMillis() - startTime);
+        return (allKeys.size() > maxResults)
+               ? allKeys.subList(0, maxResults)
+               : allKeys;
     }
 
     public double getReadLatency()


Reply via email to