Author: jbellis
Date: Sat Dec  5 00:24:19 2009
New Revision: 887465

URL: http://svn.apache.org/viewvc?rev=887465&view=rev
Log:
move "are we done yet" check in quorum read entirely into RRR.isDataPresent 
instead of partly there and partly in QRH.response
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 Sat Dec  5 00:24:19 2009
@@ -79,10 +79,9 @@
                
                private void doReadRepair() throws IOException
                {
-                       IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_);
-            /* Add the local storage endpoint to the replicas_ list */
             replicas_.add(FBUtilities.getLocalAddress());
-                       IAsyncCallback responseHandler = new 
DataRepairHandler(ConsistencyManager.this.replicas_.size(), 
readResponseResolver); 
+            IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+            IAsyncCallback responseHandler = new 
DataRepairHandler(replicas_.size(), readResponseResolver);
             ReadCommand readCommand = constructReadMessage(false);
             Message message = readCommand.makeReadMessage();
             if (logger_.isDebugEnabled())

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Sat Dec  5 00:24:19 2009
@@ -36,17 +36,12 @@
 {
     protected static final Logger logger = Logger.getLogger( 
QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    private final int responseCount;
     protected final List<Message> responses;
     private IResponseResolver<T> responseResolver;
     private final long startTime;
 
     public QuorumResponseHandler(int responseCount, IResponseResolver<T> 
responseResolver)
     {
-        assert 1 <= responseCount && responseCount <= 
DatabaseDescriptor.getReplicationFactor()
-            : "invalid response count " + responseCount;
-
-        this.responseCount = responseCount;
         responses = new ArrayList<Message>(responseCount);
         this.responseResolver =  responseResolver;
         startTime = System.currentTimeMillis();
@@ -94,7 +89,7 @@
             return;
 
         responses.add(message);
-        if (responses.size() >= responseCount && 
responseResolver.isDataPresent(responses))
+        if (responseResolver.isDataPresent(responses))
         {
             condition.signal();
         }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Sat Dec  5 00:24:19 2009
@@ -33,23 +33,22 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.apache.log4j.Logger;
 
-
-/**
- * This class is used by all read functions and is called by the Quorum 
- * when at least a few of the servers (few is specified in Quorum)
- * have sent the response . The resolve function then schedules read repair 
- * and resolution of read data from the various servers.
- */
 public class ReadResponseResolver implements IResponseResolver<Row>
 {
        private static Logger logger_ = 
Logger.getLogger(ReadResponseResolver.class);
     private final String table;
+    private final int responseCount;
 
-    public ReadResponseResolver(String table)
+    public ReadResponseResolver(String table, int responseCount)
     {
+        assert 1 <= responseCount && responseCount <= 
DatabaseDescriptor.getReplicationFactor()
+            : "invalid response count " + responseCount;
+
+        this.responseCount = responseCount;
         this.table = table;
     }
 
@@ -152,6 +151,9 @@
 
        public boolean isDataPresent(List<Message> responses)
        {
+        if (responses.size() < responseCount)
+            return false;
+
         boolean isDataPresent = false;
         for (Message response : responses)
         {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887465&r1=887464&r2=887465&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Sat Dec  5 00:24:19 2009
@@ -438,7 +438,7 @@
             }
             if (n < DatabaseDescriptor.getQuorum())
                 throw new UnavailableException();
-            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver(command.table));
+            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum()));
             MessagingService.instance().sendRR(messages, endPoints, 
quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);
@@ -466,7 +466,7 @@
             {
                 if (DatabaseDescriptor.getConsistencyCheck())
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver(command.table);
+                    IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum());
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
new QuorumResponseHandler<Row>(
                             DatabaseDescriptor.getQuorum(),
                             readResponseResolverRepair);


Reply via email to