Author: jbellis
Date: Tue Jul 27 03:32:43 2010
New Revision: 979511
URL: http://svn.apache.org/viewvc?rev=979511&view=rev
Log:
process digest mismatch re-reads in parallel, patch by jbellis; reviewed by
brandonwilliams for CASSANDRA-1323
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Jul 27 03:32:43 2010
@@ -24,7 +24,7 @@
when determining whether to do local read for CL.ONE (CASSANDRA-1317)
* fix read repair to use requested consistency level on digest mismatch,
rather than assuming QUORUM (CASSANDRA-1316)
-
+ * process digest mismatch re-reads in parallel (CASSANDRA-1323)
0.6.3
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Jul 27 03:32:43 2010
@@ -69,8 +69,8 @@ public class ReadResponseResolver implem
public Row resolve(Collection<Message> responses) throws
DigestMismatchException, IOException
{
long startTime = System.currentTimeMillis();
- List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
- List<InetAddress> endPoints = new ArrayList<InetAddress>();
+ List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(responses.size());
+ List<InetAddress> endPoints = new
ArrayList<InetAddress>(responses.size());
String key = null;
byte[] digest = new byte[0];
boolean isDigestQuery = false;
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979511&r1=979510&r2=979511&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Jul 27 03:32:43 2010
@@ -445,8 +445,7 @@ public class StorageProxy implements Sto
List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
List<Row> rows = new ArrayList<Row>();
- int commandIndex = 0;
-
+ // send out read requests
for (ReadCommand command: commands)
{
assert !command.isDigestQuery();
@@ -481,10 +480,13 @@ public class StorageProxy implements Sto
commandEndPoints.add(endPoints);
}
- for (QuorumResponseHandler<Row> quorumResponseHandler:
quorumResponseHandlers)
+ // read results and make a second pass for any digest mismatches
+ List<QuorumResponseHandler<Row>> repairResponseHandlers = null;
+ for (int i = 0; i < commands.size(); i++)
{
+ QuorumResponseHandler<Row> quorumResponseHandler =
quorumResponseHandlers.get(i);
Row row;
- ReadCommand command = commands.get(commandIndex);
+ ReadCommand command = commands.get(i);
try
{
long startTime2 = System.currentTimeMillis();
@@ -502,23 +504,32 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
int responseCount =
determineBlockFor(DatabaseDescriptor.getReplicationFactor(command.table),
consistency_level);
- IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table, responseCount);
- QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(responseCount, readResponseResolverRepair);
+ QuorumResponseHandler<Row> qrhRepair = new
QuorumResponseHandler<Row>(responseCount, new
ReadResponseResolver(command.table, responseCount));
Message messageRepair = command.makeReadMessage();
- MessagingService.instance.sendRR(messageRepair,
commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
- try
- {
- row = quorumResponseHandlerRepair.get();
- if (row != null)
- rows.add(row);
- }
- catch (DigestMismatchException e)
- {
- throw new AssertionError(e); // full data requested
from each node here, no digests should be sent
- }
+ MessagingService.instance.sendRR(messageRepair,
commandEndPoints.get(i), qrhRepair);
+ if (repairResponseHandlers == null)
+ repairResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
+ repairResponseHandlers.add(qrhRepair);
+ }
+ }
+ }
+
+ // read the results for the digest mismatch retries
+ if (repairResponseHandlers != null)
+ {
+ for (QuorumResponseHandler<Row> handler : repairResponseHandlers)
+ {
+ try
+ {
+ Row row = handler.get();
+ if (row != null)
+ rows.add(row);
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // full data requested from
each node here, no digests should be sent
}
}
- commandIndex++;
}
return rows;