Author: jbellis
Date: Sat Dec 5 00:24:19 2009
New Revision: 887465
URL: http://svn.apache.org/viewvc?rev=887465&view=rev
Log:
move "are we done yet" check in quorum read entirely into RRR.isDataPresent
instead of partly there and partly in QRH.response
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
Sat Dec 5 00:24:19 2009
@@ -79,10 +79,9 @@
private void doReadRepair() throws IOException
{
- IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_);
- /* Add the local storage endpoint to the replicas_ list */
replicas_.add(FBUtilities.getLocalAddress());
- IAsyncCallback responseHandler = new
DataRepairHandler(ConsistencyManager.this.replicas_.size(),
readResponseResolver);
+ IResponseResolver<Row> readResponseResolver = new
ReadResponseResolver(table_, replicas_.size());
+ IAsyncCallback responseHandler = new
DataRepairHandler(replicas_.size(), readResponseResolver);
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Sat Dec 5 00:24:19 2009
@@ -36,17 +36,12 @@
{
protected static final Logger logger = Logger.getLogger(
QuorumResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
- private final int responseCount;
protected final List<Message> responses;
private IResponseResolver<T> responseResolver;
private final long startTime;
public QuorumResponseHandler(int responseCount, IResponseResolver<T>
responseResolver)
{
- assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor()
- : "invalid response count " + responseCount;
-
- this.responseCount = responseCount;
responses = new ArrayList<Message>(responseCount);
this.responseResolver = responseResolver;
startTime = System.currentTimeMillis();
@@ -94,7 +89,7 @@
return;
responses.add(message);
- if (responses.size() >= responseCount &&
responseResolver.isDataPresent(responses))
+ if (responseResolver.isDataPresent(responses))
{
condition.signal();
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Sat Dec 5 00:24:19 2009
@@ -33,23 +33,22 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.log4j.Logger;
-
-/**
- * This class is used by all read functions and is called by the Quorum
- * when at least a few of the servers (few is specified in Quorum)
- * have sent the response . The resolve function then schedules read repair
- * and resolution of read data from the various servers.
- */
public class ReadResponseResolver implements IResponseResolver<Row>
{
private static Logger logger_ =
Logger.getLogger(ReadResponseResolver.class);
private final String table;
+ private final int responseCount;
- public ReadResponseResolver(String table)
+ public ReadResponseResolver(String table, int responseCount)
{
+ assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor()
+ : "invalid response count " + responseCount;
+
+ this.responseCount = responseCount;
this.table = table;
}
@@ -152,6 +151,9 @@
public boolean isDataPresent(List<Message> responses)
{
+ if (responses.size() < responseCount)
+ return false;
+
boolean isDataPresent = false;
for (Message response : responses)
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Sat Dec 5 00:24:19 2009
@@ -438,7 +438,7 @@
}
if (n < DatabaseDescriptor.getQuorum())
throw new UnavailableException();
- QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new
ReadResponseResolver(command.table));
+ QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new
ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum()));
MessagingService.instance().sendRR(messages, endPoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);
@@ -466,7 +466,7 @@
{
if (DatabaseDescriptor.getConsistencyCheck())
{
- IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table);
+ IResponseResolver<Row> readResponseResolverRepair = new
ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum());
QuorumResponseHandler<Row> quorumResponseHandlerRepair =
new QuorumResponseHandler<Row>(
DatabaseDescriptor.getQuorum(),
readResponseResolverRepair);