Author: jbellis
Date: Thu Aug 6 15:34:22 2009
New Revision: 801676
URL: http://svn.apache.org/viewvc?rev=801676&view=rev
Log:
fix range query buglet; add debug logging
patch by jbellis; tested by Mark Robson for CASSANDRA-348
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeReply.java
Thu Aug 6 15:34:22 2009
@@ -21,6 +21,8 @@
import java.util.*;
import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.Message;
@@ -50,6 +52,15 @@
return
originalMessage.getReply(StorageService.getLocalStorageEndPoint(), data);
}
+ @Override
+ public String toString()
+ {
+ return "RangeReply(" +
+ "keys=[" + StringUtils.join(keys, ", ") +
+ "], completed=" + rangeCompletedLocally +
+ ')';
+ }
+
public static RangeReply read(byte[] body) throws IOException
{
DataInputBuffer bufIn = new DataInputBuffer();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
Thu Aug 6 15:34:22 2009
@@ -550,7 +550,14 @@
throw new InvalidRequestException("maxResults must be positive");
}
- return StorageProxy.getKeyRange(new RangeCommand(tablename,
columnFamily, startWith, stopAt, maxResults));
+ try
+ {
+ return StorageProxy.getKeyRange(new RangeCommand(tablename,
columnFamily, startWith, stopAt, maxResults));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
// main method moved to CassandraDaemon
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeVerbHandler.java
Thu Aug 6 15:34:22 2009
@@ -18,6 +18,8 @@
*/
package org.apache.cassandra.service;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.db.RangeCommand;
import org.apache.cassandra.db.RangeReply;
import org.apache.cassandra.db.Table;
@@ -27,16 +29,19 @@
public class RangeVerbHandler implements IVerbHandler
{
+ private static final Logger logger =
Logger.getLogger(RangeVerbHandler.class);
+
public void doVerb(Message message)
{
- RangeReply rangeReply;
try
{
RangeCommand command = RangeCommand.read(message);
Table table = Table.open(command.table);
- rangeReply = table.getKeyRange(command.columnFamily,
command.startWith, command.stopAt, command.maxResults);
+ RangeReply rangeReply = table.getKeyRange(command.columnFamily,
command.startWith, command.stopAt, command.maxResults);
Message response = rangeReply.getReply(message);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending " + rangeReply + " to " +
message.getMessageId() + "@" + message.getFrom());
MessagingService.getMessagingInstance().sendOneWay(response,
message.getFrom());
}
catch (Exception e)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=801676&r1=801675&r2=801676&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Aug 6 15:34:22 2009
@@ -359,14 +359,6 @@
/**
* This is a multiget version of the above method.
- * @param tablename
- * @param keys
- * @param columnFamily
- * @param start
- * @param count
- * @return
- * @throws IOException
- * @throws TimeoutException
*/
public static Map<String, Row> strongReadProtocol(String[] keys,
ReadCommand readCommand) throws IOException, TimeoutException
{
@@ -571,10 +563,6 @@
* This version is used when results for multiple keys needs to be
* retrieved.
*
- * @param tablename name of the table that needs to be queried
- * @param keys keys whose values we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param columns the columns we are interested in
* @return a mapping of key --> Row
* @throws Exception
*/
@@ -631,64 +619,61 @@
return row;
}
- static List<String> getKeyRange(RangeCommand command)
+ static List<String> getKeyRange(RangeCommand command) throws IOException
{
long startTime = System.currentTimeMillis();
int endpointOffset = 0;
List<String> allKeys = new ArrayList<String>();
int maxResults = command.maxResults;
- try
+ EndPoint endPoint =
StorageService.instance().findSuitableEndPoint(command.startWith,
endpointOffset);
+ String firstEndpoint = endPoint.toString();
+
+ do
{
- EndPoint endPoint =
StorageService.instance().findSuitableEndPoint(command.startWith,
endpointOffset);
- String firstEndpoint = endPoint.toString();
+ Message message = command.getMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
+ IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(message, endPoint);
- do
+ // read response
+ byte[] responseBody = new byte[0];
+ try
{
- IAsyncResult iar =
MessagingService.getMessagingInstance().sendRR(command.getMessage(), endPoint);
-
- // read response
- byte[] responseBody =
iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- RangeReply rangeReply = RangeReply.read(responseBody);
- List<String> rangeKeys = rangeReply.keys;
-
- // deal with key overlaps
- if (allKeys.size() > 0 && rangeKeys != null &&
rangeKeys.size() > 0 && allKeys.get(allKeys.size() -
1).equals(rangeKeys.get(0)))
- {
- allKeys.remove(allKeys.size() - 1);
- allKeys.addAll(rangeKeys);
- }
- else if (rangeKeys != null && rangeKeys.size() > 0)
- {
- allKeys.addAll(rangeKeys);
- }
+ responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException(e);
+ }
+ RangeReply rangeReply = RangeReply.read(responseBody);
+ List<String> rangeKeys = rangeReply.keys;
- if (allKeys.size() >= maxResults ||
rangeReply.rangeCompletedLocally)
- {
- break;
- }
+ // deal with key overlaps
+ if (allKeys.size() > 0 && rangeKeys != null && rangeKeys.size() >
0 && allKeys.get(allKeys.size() - 1).equals(rangeKeys.get(0)))
+ {
+ allKeys.remove(allKeys.size() - 1);
+ allKeys.addAll(rangeKeys);
+ }
+ else if (rangeKeys != null && rangeKeys.size() > 0)
+ {
+ allKeys.addAll(rangeKeys);
+ }
- String newStartAt = (allKeys.size() > 0) ?
allKeys.get(allKeys.size() - 1) : command.stopAt;
+ if (allKeys.size() >= maxResults ||
rangeReply.rangeCompletedLocally)
+ {
+ break;
+ }
- command = new RangeCommand(command.table, command.columnFamily,
- newStartAt, command.stopAt,
- command.maxResults -
rangeKeys.size());
-
- endPoint =
StorageService.instance().findSuitableEndPoint(command.startWith,
++endpointOffset);
- } while (!endPoint.toString().equals(firstEndpoint));
-
- return (allKeys.size() > maxResults)
- ? allKeys.subList(0, maxResults)
- : allKeys;
- }
- catch (Exception e)
- {
- throw new RuntimeException("error reading keyrange " + command, e);
- }
- finally
- {
- rangeStats.add(System.currentTimeMillis() - startTime);
- }
+ String newStartWith = (allKeys.size() > 0) ?
allKeys.get(allKeys.size() - 1) : command.startWith;
+ command = new RangeCommand(command.table, command.columnFamily,
newStartWith, command.stopAt, command.maxResults - allKeys.size());
+ endPoint =
StorageService.instance().findSuitableEndPoint(command.startWith,
++endpointOffset);
+ } while (!endPoint.toString().equals(firstEndpoint));
+
+ rangeStats.add(System.currentTimeMillis() - startTime);
+ return (allKeys.size() > maxResults)
+ ? allKeys.subList(0, maxResults)
+ : allKeys;
}
public double getReadLatency()