Author: jbellis
Date: Fri Dec  3 18:52:06 2010
New Revision: 1041951

URL: http://svn.apache.org/viewvc?rev=1041951&view=rev
Log:
reads at ConsistencyLevel > 1 throwUnavailableException immediately if 
insufficient live nodes exist
patch by jbellis and tjake for CASSANDRA-1803

Added:
    
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec  3 18:52:06 2010
@@ -27,6 +27,8 @@ dev
    (CASSANDRA-1804)
  * cli support index type enum names (CASSANDRA-1810)
  * improved validation of column_metadata (CASSANDRA-1813)
+ * reads at ConsistencyLevel > 1 throw UnavailableException
+   immediately if insufficient live nodes exist (CASSANDRA-1803)
 
 
 0.7.0-rc1

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Fri Dec  3 18:52:06 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  
WriteResponseHandler.create(endpoint);
-            MessagingService.instance.sendRR(message, new InetAddress[] { 
endpoint }, responseHandler);
+            MessagingService.instance.sendRR(message, Arrays.asList(endpoint), 
responseHandler);
             try
             {
                 responseHandler.get();

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
 Fri Dec  3 18:52:06 2010
@@ -29,9 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -226,7 +224,7 @@ public class MessagingService implements
      * @return an reference to an IAsyncResult which can be queried for the
      * response
      */
-    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
+    public String sendRR(Message message, Collection<InetAddress> to, 
IAsyncCallback cb)
     {
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
@@ -273,18 +271,16 @@ public class MessagingService implements
      *           suggest that a timeout occured to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback 
cb)
+    public String sendRR(Message[] messages, List<InetAddress> to, 
IAsyncCallback cb)
     {
-        if ( messages.length != to.length )
-        {
+        if (messages.length != to.size())
             throw new IllegalArgumentException("Number of messages and the 
number of endpoints need to be same.");
-        }
         String groupId = GuidGenerator.guid();
         addCallback(cb, groupId);
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            sendOneWay(messages[i], to[i]);
+            sendOneWay(messages[i], to.get(i));
         }
         return groupId;
     } 

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
 Fri Dec  3 18:52:06 2010
@@ -156,7 +156,6 @@ class ConsistencyChecker implements Runn
 
        static class DataRepairHandler implements IAsyncCallback
        {
-               private final Collection<Message> responses_ = new 
LinkedBlockingQueue<Message>();
                private final ReadResponseResolver readResponseResolver_;
                private final int majority_;
                
@@ -167,7 +166,6 @@ class ConsistencyChecker implements Runn
             // wrap localRow in a response Message so it doesn't need to be 
special-cased in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
             Message fakeMessage = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
-            responses_.add(fakeMessage);
             readResponseResolver_.injectPreProcessed(fakeMessage, 
readResponse);
         }
 
@@ -176,15 +174,14 @@ class ConsistencyChecker implements Runn
                {
                        if (logger_.isDebugEnabled())
                          logger_.debug("Received response in DataRepairHandler 
: " + message.toString());
-                       responses_.add(message);
             readResponseResolver_.preprocess(message);
-            if (responses_.size() == majority_)
+            if (readResponseResolver_.getMessageCount() == majority_)
             {
                 Runnable runnable = new WrappedRunnable()
                 {
                     public void runMayThrow() throws IOException, 
DigestMismatchException
                     {
-                        readResponseResolver_.resolve(responses_);
+                        readResponseResolver_.resolve();
                     }
                 };
                 // give remaining replicas until timeout to reply and get 
added to responses_

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 Fri Dec  3 18:52:06 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.service;
  */
 
 
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -29,6 +32,7 @@ import org.apache.cassandra.locator.IEnd
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -49,14 +53,14 @@ public class DatacenterQuorumResponseHan
     @Override
     public void response(Message message)
     {
-        responses.add(message); // we'll go ahead and resolve a reply from 
anyone, even if it's not from this dc
+        resolver.preprocess(message);
 
         int n;
         n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
                 ? localResponses.decrementAndGet()
                 : localResponses.get();
 
-        if (n == 0 && responseResolver.isDataPresent(responses))
+        if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
@@ -68,4 +72,18 @@ public class DatacenterQuorumResponseHan
         NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) 
Table.open(table).getReplicationStrategy();
                return (stategy.getReplicationFactor(localdc) / 2) + 1;
        }
+
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        int localEndpoints = 0;
+        for (InetAddress endpoint : endpoints)
+        {
+            if (localdc.equals(snitch.getDatacenter(endpoint)))
+                localEndpoints++;
+        }
+        
+        if(localEndpoints < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
 Fri Dec  3 18:52:06 2010
@@ -34,8 +34,10 @@ public interface IResponseResolver<T> {
         * repairs . Hence you need to derive a response resolver based on your
         * needs from this interface.
         */
-       public T resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException;
-       public boolean isDataPresent(Collection<Message> responses);
+       public T resolve() throws DigestMismatchException, IOException;
+       public boolean isDataPresent();
 
     public void preprocess(Message message);
+    public Iterable<Message> getMessages();
+    public int getMessageCount();
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Fri Dec  3 18:52:06 2010
@@ -18,11 +18,11 @@
 
 package org.apache.cassandra.service;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
@@ -30,8 +30,8 @@ import org.apache.cassandra.net.IAsyncCa
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,19 +39,20 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses = new 
LinkedBlockingQueue<Message>();;
-    protected IResponseResolver<T> responseResolver;
+    protected final IResponseResolver<T> resolver;
     private final long startTime;
-    protected int blockfor;
+    protected final int blockfor;
     
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public QuorumResponseHandler(IResponseResolver<T> responseResolver, 
ConsistencyLevel consistencyLevel, String table)
+    public QuorumResponseHandler(IResponseResolver<T> resolver, 
ConsistencyLevel consistencyLevel, String table)
     {
         this.blockfor = determineBlockFor(consistencyLevel, table);
-        this.responseResolver = responseResolver;
+        this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
+
+        logger.debug("QuorumResponseHandler blocking for {} responses", 
blockfor);
     }
     
     public T get() throws TimeoutException, DigestMismatchException, 
IOException
@@ -72,35 +73,31 @@ public class QuorumResponseHandler<T> im
             if (!success)
             {
                 StringBuilder sb = new StringBuilder("");
-                for (Message message : responses)
+                for (Message message : resolver.getMessages())
                 {
                     sb.append(message.getFrom());
                 }
-                throw new TimeoutException("Operation timed out - received 
only " + responses.size() + " responses from " + sb.toString() + " .");
+                throw new TimeoutException("Operation timed out - received 
only " + resolver.getMessageCount() + " responses from " + sb.toString() + " 
.");
             }
         }
         finally
         {
-            for (Message response : responses)
+            for (Message response : resolver.getMessages())
             {
                 
MessagingService.removeRegisteredCallback(response.getMessageId());
             }
         }
 
-        return responseResolver.resolve(responses);
+        return resolver.resolve();
     }
     
     public void response(Message message)
     {
-        responses.add(message);
-        responseResolver.preprocess(message);
-        if (responses.size() < blockfor) {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() < blockfor)
             return;
-        }
-        if (responseResolver.isDataPresent(responses))
-        {
+        if (resolver.isDataPresent())
             condition.signal();
-        }
     }
     
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String 
table)
@@ -115,7 +112,13 @@ public class QuorumResponseHandler<T> im
             case ALL:
                 return 
Table.open(table).getReplicationStrategy().getReplicationFactor();
             default:
-                throw new UnsupportedOperationException("invalid consistency 
level: " + table.toString());
+                throw new UnsupportedOperationException("invalid consistency 
level: " + consistencyLevel);
         }
     }
+
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) 
throws UnavailableException
+    {
+        if (endpoints.size() < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
 Fri Dec  3 18:52:06 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class RangeSliceResponseResolver 
     private static final Logger logger_ = 
LoggerFactory.getLogger(RangeSliceResponseResolver.class);
     private final String table;
     private final List<InetAddress> sources;
+    protected final Collection<Message> responses = new 
LinkedBlockingQueue<Message>();;
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
@@ -53,7 +55,7 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
-    public List<Row> resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException
+    public List<Row> resolve() throws DigestMismatchException, IOException
     {
         CollatingIterator collator = new CollatingIterator(new 
Comparator<Pair<Row,InetAddress>>()
         {
@@ -110,11 +112,12 @@ public class RangeSliceResponseResolver 
 
     public void preprocess(Message message)
     {
+        responses.add(message);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
     {
-        return responses.size() >= sources.size();
+        return !responses.isEmpty();
     }
 
     private static class RowIterator extends 
AbstractIterator<Pair<Row,InetAddress>>
@@ -134,4 +137,14 @@ public class RangeSliceResponseResolver 
             return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), 
source) : endOfData();
         }
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return responses;
+    }
+
+    public int getMessageCount()
+    {
+        return responses.size();
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Fri Dec  3 18:52:06 2010
@@ -58,14 +58,14 @@ public class ReadResponseResolver implem
       * repair request should be scheduled.
       *
       */
-       public Row resolve(Collection<Message> responses) throws 
DigestMismatchException, IOException
+       public Row resolve() throws DigestMismatchException, IOException
     {
         if (logger_.isDebugEnabled())
-            logger_.debug("resolving " + responses.size() + " responses");
+            logger_.debug("resolving " + results.size() + " responses");
 
         long startTime = System.currentTimeMillis();
-               List<ColumnFamily> versions = new 
ArrayList<ColumnFamily>(responses.size());
-               List<InetAddress> endpoints = new 
ArrayList<InetAddress>(responses.size());
+               List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+               List<InetAddress> endpoints = new ArrayList<InetAddress>();
                DecoratedKey key = null;
                ByteBuffer digest = FBUtilities.EMPTY_BYTE_BUFFER;
                boolean isDigestQuery = false;
@@ -76,11 +76,10 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-               for (Message message : responses)
-               {
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived after quorum already achieved
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            Message message = entry.getKey();
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -187,6 +186,8 @@ public class ReadResponseResolver implem
         try
         {
             ReadResponse result = ReadResponse.serializer().deserialize(new 
DataInputStream(bufIn));
+            if (logger_.isDebugEnabled())
+                logger_.debug("Preprocessed {} response", 
result.isDigestQuery() ? "digest" : "data");
             results.put(message, result);
         }
         catch (IOException e)
@@ -201,16 +202,23 @@ public class ReadResponseResolver implem
         results.put(message, result);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
        {
-        for (Message message : responses)
+        for (ReadResponse result : results.values())
         {
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived concurrently
             if (!result.isDigestQuery())
                 return true;
         }
         return false;
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return results.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return results.size();
+    }
 }

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
 Fri Dec  3 18:52:06 2010
@@ -314,7 +314,7 @@ public class StorageProxy implements Sto
     private static List<Row> strongRead(List<ReadCommand> commands, 
ConsistencyLevel consistency_level) throws IOException, UnavailableException, 
TimeoutException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
-        List<InetAddress[]> commandEndpoints = new ArrayList<InetAddress[]>();
+        List<List<InetAddress>> commandEndpoints = new 
ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
 
         // send out read requests
@@ -327,25 +327,25 @@ public class StorageProxy implements Sto
             Message messageDigestOnly = 
readMessageDigestOnly.makeReadMessage();
 
             InetAddress dataPoint = 
StorageService.instance.findSuitableEndpoint(command.table, command.key);
-            List<InetAddress> endpointList = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+            List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
 
-            InetAddress[] endpoints = new InetAddress[endpointList.size()];
-            Message messages[] = new Message[endpointList.size()];
+            AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
+            QuorumResponseHandler<Row> handler = 
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
consistency_level);
+            handler.assureSufficientLiveNodes(endpoints);
+
+            Message messages[] = new Message[endpoints.size()];
             // data-request message is sent to dataPoint, the node that will 
actually get
             // the data for us. The other replicas are only sent a digest 
query.
             int n = 0;
-            for (InetAddress endpoint : endpointList)
+            for (InetAddress endpoint : endpoints)
             {
                 Message m = endpoint.equals(dataPoint) ? message : 
messageDigestOnly;
-                endpoints[n] = endpoint;
                 messages[n++] = m;
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? 
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + 
endpoint);
             }
-            AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
-            QuorumResponseHandler<Row> quorumResponseHandler = 
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
consistency_level);
-            MessagingService.instance.sendRR(messages, endpoints, 
quorumResponseHandler);
-            quorumResponseHandlers.add(quorumResponseHandler);
+            MessagingService.instance.sendRR(messages, endpoints, handler);
+            quorumResponseHandlers.add(handler);
             commandEndpoints.add(endpoints);
         }
 
@@ -369,14 +369,14 @@ public class StorageProxy implements Sto
             catch (DigestMismatchException ex)
             {
                 AbstractReplicationStrategy rs = 
Table.open(command.table).getReplicationStrategy();
-                QuorumResponseHandler<Row> qrhRepair = 
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
ConsistencyLevel.QUORUM);
+                QuorumResponseHandler<Row> handler = 
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
ConsistencyLevel.QUORUM);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
                 Message messageRepair = command.makeReadMessage();
-                MessagingService.instance.sendRR(messageRepair, 
commandEndpoints.get(i), qrhRepair);
+                MessagingService.instance.sendRR(messageRepair, 
commandEndpoints.get(i), handler);
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
-                repairResponseHandlers.add(qrhRepair);
+                repairResponseHandlers.add(handler);
             }
         }
 
@@ -498,7 +498,7 @@ public class StorageProxy implements Sto
         final Message msg = new Message(FBUtilities.getLocalAddress(), 
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
         // an empty message acts as a request to the SchemaCheckVerbHandler.
-        MessagingService.instance.sendRR(msg, liveHosts.toArray(new 
InetAddress[]{}), new IAsyncCallback() 
+        MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback()
         {
             public void response(Message msg)
             {
@@ -775,7 +775,7 @@ public class StorageProxy implements Sto
         logger.debug("Starting to send truncate messages to hosts {}", 
allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
         Message message = truncation.makeTruncationMessage();
-        MessagingService.instance.sendRR(message, allEndpoints.toArray(new 
InetAddress[]{}), responseHandler);
+        MessagingService.instance.sendRR(message, allEndpoints, 
responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} 
responses", blockFor);

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
 Fri Dec  3 18:52:06 2010
@@ -138,6 +138,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e) 
         {
+            logger.debug("... timed out");
                throw new TimedOutException();
         }
         catch (IOException e)
@@ -442,11 +443,12 @@ public class CassandraServer implements 
 
             try
             {
-              StorageProxy.mutate(mutations, consistency_level);
+                StorageProxy.mutate(mutations, consistency_level);
             }
             catch (TimeoutException e)
             {
-              throw new TimedOutException();
+                logger.debug("... timed out");
+                throw new TimedOutException();
             }
         }
         finally
@@ -512,6 +514,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
                throw new TimedOutException();
         }
         catch (IOException e)
@@ -556,6 +559,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
             throw new TimedOutException();
         }
         return thriftifyKeySlices(rows, column_parent, column_predicate);

Added: 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1041951&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
 Fri Dec  3 18:52:06 2010
@@ -0,0 +1,145 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.HashMultimap;
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsistencyLevelTest extends CleanupHelper
+{
+    @Test
+    public void testReadWriteConsistencyChecks() throws Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 3;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        IPartitioner partitioner = new RandomPartitioner();
+
+        ss.setPartitionerUnsafe(partitioner);
+
+        ArrayList<Token> endpointTokens = new ArrayList<Token>();
+        ArrayList<Token> keyTokens = new ArrayList<Token>();
+        List<InetAddress> hosts = new ArrayList<InetAddress>();
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
+
+        HashMultimap<InetAddress, InetAddress> hintedNodes = 
HashMultimap.create();
+
+
+        AbstractReplicationStrategy strategy;
+
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+        {
+            strategy = getStrategy(table, tmd);
+            StorageService.calculatePendingRanges(strategy, table);
+            int replicationFactor = strategy.getReplicationFactor();
+            if (replicationFactor < 2)
+                continue;
+
+            for (ConsistencyLevel c : ConsistencyLevel.values())
+            {
+
+                if (c == ConsistencyLevel.EACH_QUORUM || c == 
ConsistencyLevel.LOCAL_QUORUM)
+                    continue;
+
+                for (int i = 0; i < replicationFactor; i++)
+                {
+                    hintedNodes.clear();
+
+                    for (int j = 0; j < i; j++)
+                    {
+                        hintedNodes.put(hosts.get(j), hosts.get(j));
+                    }
+
+                    IWriteResponseHandler writeHandler = 
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
+
+                    QuorumResponseHandler<Row> readHandler = 
strategy.getQuorumResponseHandler(new ReadResponseResolver(table), c);
+
+                    boolean isWriteUnavailable = false;
+                    boolean isReadUnavailable = false;
+                    try
+                    {
+                        writeHandler.assureSufficientLiveNodes();
+                    }
+                    catch (UnavailableException e)
+                    {
+                        isWriteUnavailable = true;
+                    }
+
+                    try
+                    {
+                        
readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+                    }
+                    catch (UnavailableException e)
+                    {
+                        isReadUnavailable = true;
+                    }
+
+                    //these should always match (in this kind of test)
+                    assertTrue(isWriteUnavailable == isReadUnavailable);
+
+                    switch (c)
+                    {
+                        case ALL:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() < 
replicationFactor);
+                            else
+                                assertTrue(hintedNodes.size() >= 
replicationFactor);
+
+                            break;
+                        case ONE:
+                        case ANY:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() == 0);
+                            else
+                                assertTrue(hintedNodes.size() > 0);
+                            break;
+                        case QUORUM:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() < 
(replicationFactor / 2 + 1));
+                            else
+                                assertTrue(hintedNodes.size() >= 
(replicationFactor / 2 + 1));
+                            break;
+                        default:
+                            fail("Unhandled CL: " + c);
+
+                    }
+                }
+            }
+            return;
+        }
+
+        fail("Test requires at least one table with RF > 1");
+    }
+
+    private AbstractReplicationStrategy getStrategy(String table, 
TokenMetadata tmd) throws ConfigurationException
+    {
+        return AbstractReplicationStrategy.createReplicationStrategy(table,
+                                                                     
"org.apache.cassandra.locator.SimpleStrategy",
+                                                                     tmd,
+                                                                     new 
SimpleSnitch(),
+                                                                     null);
+    }
+
+}


Reply via email to