Author: jbellis
Date: Mon Jan 24 17:25:50 2011
New Revision: 1062888
URL: http://svn.apache.org/viewvc?rev=1062888&view=rev
Log:
add optimization for local reads to StorageProxy
patch by jbellis and tjake for CASSANDRA-2038
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan 24 17:25:50 2011
@@ -11,7 +11,7 @@
* retry hadoop split requests on connection failure (CASSANDRA-1927)
* implement describeOwnership for BOP, COPP (CASSANDRA-1928)
* make read repair behave as expected for ConsistencyLevel > ONE
- (CASSANDRA-982)
+ (CASSANDRA-982, 2038)
* distributed test harness (CASSANDRA-1859, 1964)
* reduce flush lock contention (CASSANDRA-1930)
* optimize supercolumn deserialization (CASSANDRA-1891)
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Mon Jan 24 17:25:50 2011
@@ -67,17 +67,7 @@ public class ReadVerbHandler implements
ReadCommand command = ReadCommand.serializer().deserialize(new
DataInputStream(readCtx.bufIn_));
Table table = Table.open(command.table);
Row row = command.getRow(table);
- ReadResponse readResponse;
- if (command.isDigestQuery())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("digest is " +
ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
- readResponse = new ReadResponse(ColumnFamily.digest(row.cf));
- }
- else
- {
- readResponse = new ReadResponse(row);
- }
+ ReadResponse readResponse = getResponse(command, row);
/* serialize the ReadResponseMessage. */
readCtx.bufOut_.reset();
@@ -97,4 +87,18 @@ public class ReadVerbHandler implements
throw new RuntimeException(ex);
}
}
+
+ public static ReadResponse getResponse(ReadCommand command, Row row)
+ {
+ if (command.isDigestQuery())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("digest is " +
ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
+ return new ReadResponse(ColumnFamily.digest(row.cf));
+ }
+ else
+ {
+ return new ReadResponse(row);
+ }
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
Mon Jan 24 17:25:50 2011
@@ -45,12 +45,12 @@ public class Header
return serializer_;
}
- private InetAddress from_;
+ private final InetAddress from_;
// TODO STAGE can be determined from verb
- private StorageService.Verb verb_;
- private String messageId_;
+ private final StorageService.Verb verb_;
+ private final String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
-
+
Header(String id, InetAddress from, StorageService.Verb verb)
{
assert id != null;
@@ -88,12 +88,7 @@ public class Header
return messageId_;
}
- void setMessageId(String id)
- {
- messageId_ = id;
- }
-
- byte[] getDetail(Object key)
+ byte[] getDetail(String key)
{
return details_.get(key);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
Mon Jan 24 17:25:50 2011
@@ -59,7 +59,7 @@ public class Message
this(new Header(from, verb), body);
}
- public byte[] getHeader(Object key)
+ public byte[] getHeader(String key)
{
return header_.getDetail(key);
}
@@ -94,11 +94,6 @@ public class Message
return header_.getMessageId();
}
- void setMessageId(String id)
- {
- header_.setMessageId(id);
- }
-
// TODO should take byte[] + length so we don't have to copy to a byte[]
of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Jan 24 17:25:50 2011
@@ -312,34 +312,6 @@ public final class MessagingService impl
}
/**
- * Send a message to a given endpoint. The ith element in the
<code>messages</code>
- * array is sent to the ith element in the <code>to</code> array.This
method assumes
- * there is a one-one mapping between the <code>messages</code> array and
- * the <code>to</code> array. Otherwise an IllegalArgumentException will
be thrown.
- * This method also informs the MessagingService to wait for at least
- * <code>howManyResults</code> responses to determine success of failure.
- * @param messages messages to be sent.
- * @param to endpoints to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses or
- * suggest that a timeout occured to the invoker of the send().
- * @return an reference to message id used to match with the result
- */
- public String sendRR(Message[] messages, List<InetAddress> to,
IAsyncCallback cb)
- {
- if (messages.length != to.size())
- throw new IllegalArgumentException("Number of messages and the
number of endpoints need to be same.");
- String groupId = GuidGenerator.guid();
- addCallback(cb, groupId);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- putTarget(groupId, to.get(i));
- sendOneWay(messages[i], to.get(i));
- }
- return groupId;
- }
-
- /**
* Send a message to a given endpoint. This method adheres to the fire and
forget
* style messaging.
* @param message messages to be sent.
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Mon Jan 24 17:25:50 2011
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
@@ -89,6 +90,15 @@ public class ReadCallback<T> implements
if (resolver.isDataPresent())
condition.signal();
}
+
+ public void response(ReadResponse result)
+ {
+ ((ReadResponseResolver) resolver).injectPreProcessed(result);
+ if (resolver.getMessageCount() < blockfor)
+ return;
+ if (resolver.isDataPresent())
+ condition.signal();
+ }
public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
{
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Mon Jan 24 17:25:50 2011
@@ -29,12 +29,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
@@ -48,6 +50,7 @@ public class ReadResponseResolver implem
private final ConcurrentMap<Message, ReadResponse> results = new
NonBlockingHashMap<Message, ReadResponse>();
private DecoratedKey key;
private ByteBuffer digest;
+ private static final Message FAKE_MESSAGE = new
Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);;
public ReadResponseResolver(String table, ByteBuffer key)
{
@@ -225,10 +228,11 @@ public class ReadResponseResolver implem
}
}
- /** hack so ConsistencyChecker doesn't have to serialize/deserialize an
extra real Message */
- public void injectPreProcessed(Message message, ReadResponse result)
+ /** hack so local reads don't force de/serialization of an extra real
Message */
+ public void injectPreProcessed(ReadResponse result)
{
- results.put(message, result);
+ assert results.get(FAKE_MESSAGE) == null; // should only be one local
reply
+ results.put(FAKE_MESSAGE, result);
}
public boolean isDataPresent()
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Jan 24 17:25:50 2011
@@ -343,14 +343,9 @@ public class StorageProxy implements Sto
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
- ReadCommand readMessageDigestOnly = command.copy();
- readMessageDigestOnly.setDigestQuery(true);
- Message message = command.makeReadMessage();
- Message messageDigestOnly =
readMessageDigestOnly.makeReadMessage();
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
- InetAddress dataPoint = endpoints.get(0);
ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
ReadCallback<Row> handler = getReadCallback(resolver,
command.table, consistency_level);
@@ -366,19 +361,52 @@ public class StorageProxy implements Sto
{
endpoints = endpoints.subList(0, handler.blockfor);
}
- Message[] messages = new Message[endpoints.size()];
-
- // data-request message is sent to dataPoint, the node that will
actually get
+
+ // The data-request message is sent to dataPoint, the node that
will actually get
// the data for us. The other replicas are only sent a digest
query.
- for (int i = 0; i < messages.length; i++)
+ ReadCommand digestCommand = null;
+ if (endpoints.size() > 1)
+ {
+ digestCommand = command.copy();
+ digestCommand.setDigestQuery(true);
+ }
+
+ InetAddress dataPoint = endpoints.get(0);
+ if (dataPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("reading data for " + command + " locally");
+ StageManager.getStage(Stage.READ).submit(new
WeakReadLocalRunnable(command, handler));
+ }
+ else
{
- InetAddress endpoint = endpoints.get(i);
- Message m = endpoint.equals(dataPoint) ? message :
messageDigestOnly;
- messages[i] = m;
+ Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("reading " + (m == message ? "data" :
"digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
+ logger.debug("reading digest for " + command + " from " +
message.getMessageId() + "@" + dataPoint);
+ MessagingService.instance().sendRR(message, dataPoint,
handler);
}
- MessagingService.instance().sendRR(messages, endpoints, handler);
+
+ // We lazy-construct the digest Message object since it may not be
necessary if we
+ // are doing a local digest read, or no digest reads at all.
+ Message digestMessage = null;
+ for (InetAddress digestPoint : endpoints.subList(1,
endpoints.size()))
+ {
+ if (digestPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("reading digest for " + command + "
locally");
+ StageManager.getStage(Stage.READ).submit(new
WeakReadLocalRunnable(digestCommand, handler));
+ }
+ else
+ {
+ if (digestMessage == null)
+ digestMessage = digestCommand.makeReadMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("reading digest for " + command + " from
" + digestMessage.getMessageId() + "@" + digestPoint);
+ MessagingService.instance().sendRR(digestMessage,
digestPoint, handler);
+ }
+ }
+
readCallbacks.add(handler);
commandEndpoints.add(endpoints);
}
@@ -436,6 +464,28 @@ public class StorageProxy implements Sto
return rows;
}
+ static class WeakReadLocalRunnable extends WrappedRunnable
+ {
+ private final ReadCommand command;
+ private final ReadCallback<Row> handler;
+
+ WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+ {
+ this.command = command;
+ this.handler = handler;
+ }
+
+ protected void runMayThrow() throws IOException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("weakreadlocal reading " + command);
+
+ Table table = Table.open(command.table);
+ ReadResponse result = ReadVerbHandler.getResponse(command,
command.getRow(table));
+ handler.response(result);
+ }
+ }
+
static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver,
String table, ConsistencyLevel consistencyLevel)
{
if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) ||
consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))