Author: jbellis
Date: Wed Dec  9 05:49:03 2009
New Revision: 888707

URL: http://svn.apache.org/viewvc?rev=888707&view=rev
Log:
support ConsistencyLevel.ALL on read.  patch by jbellis; reviewed by goffinet 
for CASSANDRA-584

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec  9 05:49:03 2009
@@ -19,6 +19,7 @@
  * increase failure conviction threshold, resulting in less nodes
    incorrectly (and temporarily) marked as down (CASSANDRA-610)
  * respect memtable thresholds during log replay (CASSANDRA-609)
+ * support ConsistencyLevel.ALL on read (CASSANDRA-584)
 
 
 0.5.0 beta

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=888707&r1=888706&r2=888707&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Wed Dec  9 05:49:03 2009
@@ -407,6 +407,7 @@
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
         List<Row> rows = new ArrayList<Row>();
 
+        int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(), 
DatabaseDescriptor.getReplicationFactor(), consistency_level);
         int commandIndex = 0;
 
         for (ReadCommand command: commands)
@@ -419,28 +420,24 @@
             Message messageDigestOnly = 
readMessageDigestOnly.makeReadMessage();
 
             InetAddress dataPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
-            List<InetAddress> endpointList = 
StorageService.instance().getNaturalEndpoints(command.key);
+            List<InetAddress> endpointList = 
StorageService.instance().getLiveNaturalEndpoints(command.key);
+            if (endpointList.size() < responseCount)
+                throw new UnavailableException();
 
             InetAddress[] endPoints = new InetAddress[endpointList.size()];
             Message messages[] = new Message[endpointList.size()];
-            /*
-             * data-request message is sent to dataPoint, the node that will 
actually get
-             * the data for us. The other replicas are only sent a digest 
query.
-            */
+            // data-request message is sent to dataPoint, the node that will 
actually get
+            // the data for us. The other replicas are only sent a digest 
query.
             int n = 0;
             for (InetAddress endpoint : endpointList)
             {
-                if (!FailureDetector.instance().isAlive(endpoint))
-                    continue;
                 Message m = endpoint.equals(dataPoint) ? message : 
messageDigestOnly;
                 endPoints[n] = endpoint;
                 messages[n++] = m;
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? 
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + 
endpoint);
             }
-            if (n < DatabaseDescriptor.getQuorum())
-                throw new UnavailableException();
-            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum()));
+            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver(command.table, responseCount));
             MessagingService.instance().sendRR(messages, endPoints, 
quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndPoints.add(endPoints);


Reply via email to