Author: jbellis
Date: Tue Dec 21 21:45:56 2010
New Revision: 1051667
URL: http://svn.apache.org/viewvc?rev=1051667&view=rev
Log:
merge from 0.7
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/py_stress/stress.py
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1050987
-/cassandra/branches/cassandra-0.7:1026517-1050989
+/cassandra/branches/cassandra-0.6:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Dec 21 21:45:56 2010
@@ -23,6 +23,8 @@
* flush before repair (CASSANDRA-1748)
* SSTableExport validates key order (CASSANDRA-1884)
* Re-cache hot keys post-compaction without hitting disk (CASSANDRA-1878)
+ * manage read repair in coordinator instead of data source, to
+ provide latency information to dynamic snitch (CASSANDRA-1873)
0.7.0-rc2
Modified: cassandra/trunk/contrib/py_stress/stress.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/stress.py (original)
+++ cassandra/trunk/contrib/py_stress/stress.py Tue Dec 21 21:45:56 2010
@@ -24,6 +24,7 @@ have_multiproc = False
try:
from multiprocessing import Array as array, Process as Thread
from uuid import uuid1 as get_ident
+ array('i', 1) # catch "This platform lacks a functioning sem_open
implementation"
Thread.isAlive = Thread.is_alive
have_multiproc = True
except ImportError:
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1051666
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Tue Dec
21 21:45:56 2010
@@ -36,7 +36,6 @@ import org.apache.cassandra.utils.FBUtil
public abstract class ReadCommand
{
- public static final String DO_REPAIR = "READ-REPAIR";
public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
public static final byte CMD_TYPE_GET_SLICE = 2;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Tue Dec
21 21:45:56 2010
@@ -59,6 +59,7 @@ private static ICompactSerializer<ReadRe
public ReadResponse(Row row)
{
+ assert row != null;
row_ = row;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue
Dec 21 21:45:56 2010
@@ -96,14 +96,6 @@ public class ReadVerbHandler implements
logger_.debug(String.format("Read key %s; sending response to
%...@%s",
FBUtilities.bytesToHex(command.key),
message.getMessageId(), message.getFrom()));
MessagingService.instance.sendOneWay(response, message.getFrom());
-
- /* Do read repair if header of the message says so */
- if (message.getHeader(ReadCommand.DO_REPAIR) != null)
- {
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- StorageService.instance.doConsistencyCheck(row, endpoints,
command);
- }
}
catch (IOException ex)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Tue Dec
21 21:45:56 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +37,7 @@ class AsyncResult implements IAsyncResul
private Lock lock = new ReentrantLock();
private Condition condition;
private long startTime;
+ private InetAddress from;
public AsyncResult()
{
@@ -74,14 +76,15 @@ class AsyncResult implements IAsyncResul
}
return result;
}
-
+
public void result(Message response)
{
try
{
lock.lock();
if (!done.get())
- {
+ {
+ from = response.getFrom();
result = response.getMessageBody();
done.set(true);
condition.signal();
@@ -91,5 +94,10 @@ class AsyncResult implements IAsyncResul
{
lock.unlock();
}
- }
+ }
+
+ public InetAddress getFrom()
+ {
+ return from;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Tue Dec
21 21:45:56 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.net;
+import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,4 +39,6 @@ public interface IAsyncResult
* @param result the response message
*/
public void result(Message result);
+
+ public InetAddress getFrom();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Tue Dec 21 21:45:56 2010
@@ -41,7 +41,6 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -55,7 +54,7 @@ import org.apache.cassandra.utils.Wrappe
* (1) sends DIGEST read requests to each other replica of the given row.
*
* [DigestResponseHandler]
- * (2) If any of the digests to not match the local one, it sends a second
round of requests
+ * (2) If any of the digests to not match the data read, it sends a second
round of requests
* to each replica, this time for the full data
*
* [DataRepairHandler]
@@ -68,18 +67,17 @@ class ConsistencyChecker implements Runn
private static ScheduledExecutorService executor_ = new
ScheduledThreadPoolExecutor(1); // TODO add JMX
- private final String table_;
private final Row row_;
protected final List<InetAddress> replicas_;
private final ReadCommand readCommand_;
+ private final InetAddress dataSource;
- public ConsistencyChecker(String table, Row row, List<InetAddress>
endpoints, ReadCommand readCommand)
+ public ConsistencyChecker(ReadCommand command, Row row, List<InetAddress>
endpoints, InetAddress dataSource)
{
- table_ = table;
row_ = row;
replicas_ = endpoints;
- readCommand_ = readCommand;
- assert replicas_.contains(FBUtilities.getLocalAddress());
+ readCommand_ = command;
+ this.dataSource = dataSource;
}
public void run()
@@ -94,7 +92,7 @@ class ConsistencyChecker implements Runn
MessagingService.instance.addCallback(new DigestResponseHandler(),
message.getMessageId());
for (InetAddress endpoint : replicas_)
{
- if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message, endpoint);
}
}
@@ -130,17 +128,14 @@ class ConsistencyChecker implements Runn
if (!localDigest.equals(digest))
{
- ReadResponseResolver readResponseResolver = new
ReadResponseResolver(table_);
- IAsyncCallback responseHandler = new
DataRepairHandler(row_, replicas_.size(), readResponseResolver);
-
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " +
readCommand_.key + " from " + message.getMessageId() + "@[" +
StringUtils.join(replicas_, ", ") + "]");
- MessagingService.instance.addCallback(responseHandler,
message.getMessageId());
+ MessagingService.instance.addCallback(new
DataRepairHandler(), message.getMessageId());
for (InetAddress endpoint : replicas_)
{
- if (!endpoint.equals(FBUtilities.getLocalAddress()))
+ if (!endpoint.equals(dataSource))
MessagingService.instance.sendOneWay(message,
endpoint);
}
@@ -154,18 +149,18 @@ class ConsistencyChecker implements Runn
}
}
- static class DataRepairHandler implements IAsyncCallback
+ class DataRepairHandler implements IAsyncCallback
{
private final ReadResponseResolver readResponseResolver_;
private final int majority_;
- public DataRepairHandler(Row localRow, int responseCount,
ReadResponseResolver readResponseResolver) throws IOException
+ public DataRepairHandler() throws IOException
{
- readResponseResolver_ = readResponseResolver;
- majority_ = (responseCount / 2) + 1;
- // wrap localRow in a response Message so it doesn't need to be
special-cased in the resolver
- ReadResponse readResponse = new ReadResponse(localRow);
- Message fakeMessage = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+ readResponseResolver_ = new
ReadResponseResolver(readCommand_.table, readCommand_.key);
+ majority_ = (replicas_.size() / 2) + 1;
+ // wrap original data Row in a response Message so it doesn't need
to be special-cased in the resolver
+ ReadResponse readResponse = new ReadResponse(row_);
+ Message fakeMessage = new Message(dataSource,
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
readResponseResolver_.injectPreProcessed(fakeMessage,
readResponse);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue
Dec 21 21:45:56 2010
@@ -42,12 +42,12 @@ import org.apache.cassandra.net.Messagin
public class GCInspector
{
- public static final GCInspector instance = new GCInspector();
-
private static final Logger logger =
LoggerFactory.getLogger(GCInspector.class);
final static long INTERVAL_IN_MS = 1000;
final static long MIN_DURATION = 200;
final static long MIN_DURATION_TPSTATS = 1000;
+
+ public static final GCInspector instance = new GCInspector();
private HashMap<String, Long> gctimes = new HashMap<String, Long>();
private final MBeanServer server =
ManagementFactory.getPlatformMBeanServer();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Dec 21 21:45:56 2010
@@ -45,10 +45,12 @@ public class ReadResponseResolver implem
private static Logger logger_ =
LoggerFactory.getLogger(ReadResponseResolver.class);
private final String table;
private final Map<Message, ReadResponse> results = new
NonBlockingHashMap<Message, ReadResponse>();
+ private DecoratedKey key;
- public ReadResponseResolver(String table)
+ public ReadResponseResolver(String table, ByteBuffer key)
{
this.table = table;
+ this.key = StorageService.getPartitioner().decorateKey(key);
}
/*
@@ -66,9 +68,8 @@ public class ReadResponseResolver implem
long startTime = System.currentTimeMillis();
List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
List<InetAddress> endpoints = new ArrayList<InetAddress>();
- DecoratedKey key = null;
ByteBuffer digest = null;
-
+
/*
* Populate the list of rows from each of the messages
* Check to see if there is a digest query. If a digest
@@ -96,7 +97,6 @@ public class ReadResponseResolver implem
{
versions.add(result.row().cf);
endpoints.add(message.getFrom());
- key = result.row().key;
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue
Dec 21 21:45:56 2010
@@ -240,7 +240,7 @@ public class StorageProxy implements Sto
// send off all the commands asynchronously
List<Future<Object>> localFutures = null;
- List<IAsyncResult> remoteResults = null;
+ HashMap<ReadCommand, IAsyncResult> remoteResults = null;
for (ReadCommand command: commands)
{
InetAddress endPoint =
StorageService.instance.findSuitableEndpoint(command.table, command.key);
@@ -257,13 +257,11 @@ public class StorageProxy implements Sto
else
{
if (remoteResults == null)
- remoteResults = new ArrayList<IAsyncResult>();
+ remoteResults = new HashMap<ReadCommand, IAsyncResult>();
Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
logger.debug("weakread reading " + command + " from " +
message.getMessageId() + "@" + endPoint);
- if (randomlyReadRepair(command))
- message.setHeader(ReadCommand.DO_REPAIR,
ReadCommand.DO_REPAIR.getBytes());
- remoteResults.add(MessagingService.instance.sendRR(message,
endPoint));
+ remoteResults.put(command,
MessagingService.instance.sendRR(message, endPoint));
}
}
@@ -286,14 +284,18 @@ public class StorageProxy implements Sto
}
if (remoteResults != null)
{
- for (IAsyncResult iar: remoteResults)
+ for (Map.Entry<ReadCommand, IAsyncResult> entry :
remoteResults.entrySet())
{
+ ReadCommand command = entry.getKey();
+ IAsyncResult iar = entry.getValue();
byte[] body;
body = iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
ReadResponse response =
ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (response.row() != null)
- rows.add(response.row());
+ assert response.row() != null;
+ rows.add(response.row());
+ if (randomlyReadRepair(command))
+ StorageService.instance.doConsistencyCheck(response.row(),
command, iar.getFrom());
}
}
@@ -331,7 +333,8 @@ public class StorageProxy implements Sto
List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
- QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
+ ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
+ QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
handler.assureSufficientLiveNodes(endpoints);
Message messages[] = new Message[endpoints.size()];
@@ -370,7 +373,8 @@ public class StorageProxy implements Sto
catch (DigestMismatchException ex)
{
AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
- QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
+ ReadResponseResolver resolver = new
ReadResponseResolver(command.table, command.key);
+ QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(resolver, consistency_level);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
Message messageRepair = command.makeReadMessage();
@@ -735,11 +739,7 @@ public class StorageProxy implements Sto
// Do the consistency checks in the background
if (randomlyReadRepair(command))
- {
- List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- if (endpoints.size() > 1)
- StorageService.instance.doConsistencyCheck(row, endpoints,
command);
- }
+ StorageService.instance.doConsistencyCheck(row, command,
FBUtilities.getLocalAddress());
return row;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Dec 21 21:45:56 2010
@@ -485,15 +485,17 @@ public class StorageService implements I
{
return tokenMetadata_;
}
-
+
/**
* This method performs the requisite operations to make
* sure that the N replicas are in sync. We do this in the
* background when we do not care much about consistency.
*/
- public void doConsistencyCheck(Row row, List<InetAddress> endpoints,
ReadCommand command)
+ public void doConsistencyCheck(Row row, ReadCommand command, InetAddress
dataSource)
{
- consistencyManager_.submit(new ConsistencyChecker(command.table, row,
endpoints, command));
+ List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+ if (endpoints.size() > 1)
+ consistencyManager_.submit(new ConsistencyChecker(command, row,
endpoints, dataSource));
}
/**
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Tue Dec 21 21:45:56 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.locator.Simp
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -95,7 +96,7 @@ public class ConsistencyLevelTest extend
IWriteResponseHandler writeHandler =
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
- QuorumResponseHandler<Row> readHandler =
strategy.getQuorumResponseHandler(new ReadResponseResolver(table), c);
+ QuorumResponseHandler<Row> readHandler =
strategy.getQuorumResponseHandler(new ReadResponseResolver(table,
ByteBufferUtil.bytes("foo")), c);
boolean isWriteUnavailable = false;
boolean isReadUnavailable = false;