Author: jbellis
Date: Tue Dec 21 20:41:47 2010
New Revision: 1051640
URL: http://svn.apache.org/viewvc?rev=1051640&view=rev
Log:
manage read repair in coordinator instead of data source, to provide latency
information to dynamic snitch
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1873
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Dec 21 20:41:47 2010
@@ -26,6 +26,8 @@
* return InvalidRequest when remove of subcolumn without supercolumn
is requested (CASSANDRA-1866)
* Re-cache hot keys post-compaction without hitting disk (CASSANDRA-1878)
+ * manage read repair in coordinator instead of data source, to
+ provide latency information to dynamic snitch (CASSANDRA-1873)
0.6.8
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
Tue Dec 21 20:41:47 2010
@@ -36,7 +36,6 @@ import org.apache.cassandra.concurrent.S
public abstract class ReadCommand
{
- public static final String DO_REPAIR = "READ-REPAIR";
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
Tue Dec 21 20:41:47 2010
@@ -63,6 +63,7 @@ private static ICompactSerializer<ReadRe
public ReadResponse(Row row)
{
+ assert row != null;
row_ = row;
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Tue Dec 21 20:41:47 2010
@@ -92,14 +92,6 @@ public class ReadVerbHandler implements
if (logger_.isDebugEnabled())
logger_.debug("Read key " + command.key + "; sending response to
" + message.getMessageId() + "@" + message.getFrom());
MessagingService.instance.sendOneWay(response, message.getFrom());
-
- /* Do read repair if header of the message says so */
- if (message.getHeader(ReadCommand.DO_REPAIR) != null)
- {
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- StorageService.instance.doConsistencyCheck(row, endpoints,
command);
- }
}
catch (IOException ex)
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
Tue Dec 21 20:41:47 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +36,7 @@ class AsyncResult implements IAsyncResul
private Lock lock_ = new ReentrantLock();
private Condition condition_;
private long startTime_;
+ private InetAddress from;
public AsyncResult()
{
@@ -101,14 +103,15 @@ class AsyncResult implements IAsyncResul
}
return result_;
}
-
+
public void result(Message response)
{
try
{
lock_.lock();
if ( !done_.get() )
- {
+ {
+ from = response.getFrom();
result_ = response.getMessageBody();
done_.set(true);
condition_.signal();
@@ -118,5 +121,10 @@ class AsyncResult implements IAsyncResul
{
lock_.unlock();
}
- }
+ }
+
+ public InetAddress getFrom()
+ {
+ return from;
+ }
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
Tue Dec 21 20:41:47 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,4 +52,6 @@ public interface IAsyncResult
* @param result the response message
*/
public void result(Message result);
+
+ public InetAddress getFrom();
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Tue Dec 21 20:41:47 2010
@@ -38,7 +38,6 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -52,7 +51,7 @@ import org.apache.cassandra.utils.FBUtil
* (1) sends DIGEST read requests to each other replica of the given row.
*
* [DigestResponseHandler]
- * (2) If any of the digests to not match the local one, it sends a second
round of requests
+ * (2) If any of the digests to not match the data read, it sends a second
round of requests
* to each replica, this time for the full data
*
* [DataRepairHandler]
@@ -64,18 +63,17 @@ class ConsistencyChecker implements Runn
private static Logger logger_ =
Logger.getLogger(ConsistencyChecker.class);
private static ExpiringMap<String, String> readRepairTable_ = new
ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
- private final String table_;
private final Row row_;
protected final List<InetAddress> replicas_;
private final ReadCommand readCommand_;
+ private final InetAddress dataSource;
- public ConsistencyChecker(String table, Row row, List<InetAddress>
endpoints, ReadCommand readCommand)
+ public ConsistencyChecker(ReadCommand command, Row row, List<InetAddress>
endpoints, InetAddress dataSource)
{
- table_ = table;
row_ = row;
replicas_ = endpoints;
- readCommand_ = readCommand;
- assert replicas_.contains(FBUtilities.getLocalAddress());
+ readCommand_ = command;
+ this.dataSource = dataSource;
}
public void run()
@@ -90,7 +88,7 @@ class ConsistencyChecker implements Runn
MessagingService.instance.addCallback(new DigestResponseHandler(),
message.getMessageId());
for (InetAddress endpoint : replicas_)
{
- if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message, endpoint);
}
}
@@ -110,7 +108,7 @@ class ConsistencyChecker implements Runn
class DigestResponseHandler implements IAsyncCallback
{
private boolean repairInvoked;
- private final byte[] localDigest = ColumnFamily.digest(row_.cf);
+ private final byte[] dataDigest = ColumnFamily.digest(row_.cf);
public synchronized void response(Message response)
{
@@ -124,19 +122,16 @@ class ConsistencyChecker implements Runn
ReadResponse result =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
byte[] digest = result.digest();
- if (!Arrays.equals(localDigest, digest))
+ if (!Arrays.equals(dataDigest, digest))
{
- ReadResponseResolver readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
- IAsyncCallback responseHandler = new
DataRepairHandler(row_, replicas_.size(), readResponseResolver);
-
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.addCallback(responseHandler,
message.getMessageId());
+ MessagingService.instance.addCallback(new
DataRepairHandler(), message.getMessageId());
for (InetAddress endpoint : replicas_)
{
- if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message,
endpoint);
}
@@ -150,19 +145,19 @@ class ConsistencyChecker implements Runn
}
}
- static class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
+ class DataRepairHandler implements IAsyncCallback,
ICacheExpungeHook<String, String>
{
private final Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
private final ReadResponseResolver readResponseResolver_;
private final int majority_;
- public DataRepairHandler(Row localRow, int responseCount,
ReadResponseResolver readResponseResolver) throws IOException
+ public DataRepairHandler() throws IOException
{
- readResponseResolver_ = readResponseResolver;
- majority_ = (responseCount / 2) + 1;
- // wrap localRow in a response Message so it doesn't need to be
special-cased in the resolver
- ReadResponse readResponse = new ReadResponse(localRow);
- Message fakeMessage = new Message(FBUtilities.getLocalAddress(),
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
+ readResponseResolver_ = new
ReadResponseResolver(readCommand_.table, replicas_.size());
+ majority_ = (replicas_.size() / 2) + 1;
+ // wrap original data Row in a response Message so it doesn't need
to be special-cased in the resolver
+ ReadResponse readResponse = new ReadResponse(row_);
+ Message fakeMessage = new Message(dataSource,
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
responses_.add(fakeMessage);
readResponseResolver_.injectPreProcessed(fakeMessage,
readResponse);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Dec 21 20:41:47 2010
@@ -363,7 +363,7 @@ public class StorageProxy implements Sto
// send off all the commands asynchronously
List<Future<Object>> localFutures = null;
- List<IAsyncResult> remoteResults = null;
+ HashMap<ReadCommand, IAsyncResult> remoteResults = null;
for (ReadCommand command: commands)
{
InetAddress endPoint =
StorageService.instance.findSuitableEndPoint(command.table, command.key);
@@ -380,13 +380,11 @@ public class StorageProxy implements Sto
else
{
if (remoteResults == null)
- remoteResults = new ArrayList<IAsyncResult>();
+ remoteResults = new HashMap<ReadCommand, IAsyncResult>();
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
logger.debug("weakread reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- if (DatabaseDescriptor.getConsistencyCheck())
- message.setHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
- remoteResults.add(MessagingService.instance.sendRR(message,
endPoint));
+ remoteResults.put(command,
MessagingService.instance.sendRR(message, endPoint));
}
}
@@ -409,14 +407,16 @@ public class StorageProxy implements Sto
}
if (remoteResults != null)
{
- for (IAsyncResult iar: remoteResults)
+ for (Map.Entry<ReadCommand, IAsyncResult> entry :
remoteResults.entrySet())
{
+ IAsyncResult iar = entry.getValue();
byte[] body;
body = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
ReadResponse response =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (response.row() != null)
- rows.add(response.row());
+ assert response.row() != null;
+ rows.add(response.row());
+ StorageService.instance.doConsistencyCheck(response.row(),
entry.getKey(), iar.getFrom());
}
}
@@ -711,14 +711,7 @@ public class StorageProxy implements Sto
Table table = Table.open(command.table);
Row row = command.getRow(table);
-
- // Do the consistency checks in the background
- if (DatabaseDescriptor.getConsistencyCheck())
- {
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- StorageService.instance.doConsistencyCheck(row, endpoints,
command);
- }
+ StorageService.instance.doConsistencyCheck(row, command,
FBUtilities.getLocalAddress());
return row;
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Tue Dec 21 20:41:47 2010
@@ -418,15 +418,20 @@ public class StorageService implements I
{
return tokenMetadata_;
}
-
+
/**
* This method performs the requisite operations to make
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
- public void doConsistencyCheck(Row row, List<InetAddress> endpoints,
ReadCommand command)
+ public void doConsistencyCheck(Row row, ReadCommand command, InetAddress
dataSource)
{
- consistencyManager_.submit(new ConsistencyChecker(command.table, row,
endpoints, command));
+ if (DatabaseDescriptor.getConsistencyCheck())
+ {
+ List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+ if (endpoints.size() > 1)
+ consistencyManager_.submit(new ConsistencyChecker(command,
row, endpoints, dataSource));
+ }
}
/**