Author: jbellis
Date: Fri Dec 3 18:52:06 2010
New Revision: 1041951
URL: http://svn.apache.org/viewvc?rev=1041951&view=rev
Log:
reads at ConsistencyLevel > 1 throwUnavailableException immediately if
insufficient live nodes exist
patch by jbellis and tjake for CASSANDRA-1803
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec 3 18:52:06 2010
@@ -27,6 +27,8 @@ dev
(CASSANDRA-1804)
* cli support index type enum names (CASSANDRA-1810)
* improved validation of column_metadata (CASSANDRA-1813)
+ * reads at ConsistencyLevel > 1 throw UnavailableException
+ immediately if insufficient live nodes exist (CASSANDRA-1803)
0.7.0-rc1
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Fri Dec 3 18:52:06 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
rm.add(cf);
Message message = rm.makeRowMutationMessage();
IWriteResponseHandler responseHandler =
WriteResponseHandler.create(endpoint);
- MessagingService.instance.sendRR(message, new InetAddress[] {
endpoint }, responseHandler);
+ MessagingService.instance.sendRR(message, Arrays.asList(endpoint),
responseHandler);
try
{
responseHandler.get();
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Fri Dec 3 18:52:06 2010
@@ -29,9 +29,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -226,7 +224,7 @@ public class MessagingService implements
* @return an reference to an IAsyncResult which can be queried for the
* response
*/
- public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
+ public String sendRR(Message message, Collection<InetAddress> to,
IAsyncCallback cb)
{
String messageId = message.getMessageId();
addCallback(cb, messageId);
@@ -273,18 +271,16 @@ public class MessagingService implements
* suggest that a timeout occured to the invoker of the send().
* @return an reference to message id used to match with the result
*/
- public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback
cb)
+ public String sendRR(Message[] messages, List<InetAddress> to,
IAsyncCallback cb)
{
- if ( messages.length != to.length )
- {
+ if (messages.length != to.size())
throw new IllegalArgumentException("Number of messages and the
number of endpoints need to be same.");
- }
String groupId = GuidGenerator.guid();
addCallback(cb, groupId);
for ( int i = 0; i < messages.length; ++i )
{
messages[i].setMessageId(groupId);
- sendOneWay(messages[i], to[i]);
+ sendOneWay(messages[i], to.get(i));
}
return groupId;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Fri Dec 3 18:52:06 2010
@@ -156,7 +156,6 @@ class ConsistencyChecker implements Runn
static class DataRepairHandler implements IAsyncCallback
{
- private final Collection<Message> responses_ = new
LinkedBlockingQueue<Message>();
private final ReadResponseResolver readResponseResolver_;
private final int majority_;
@@ -167,7 +166,6 @@ class ConsistencyChecker implements Runn
// wrap localRow in a response Message so it doesn't need to be
special-cased in the resolver
ReadResponse readResponse = new ReadResponse(localRow);
Message fakeMessage = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
- responses_.add(fakeMessage);
readResponseResolver_.injectPreProcessed(fakeMessage,
readResponse);
}
@@ -176,15 +174,14 @@ class ConsistencyChecker implements Runn
{
if (logger_.isDebugEnabled())
logger_.debug("Received response in DataRepairHandler
: " + message.toString());
- responses_.add(message);
readResponseResolver_.preprocess(message);
- if (responses_.size() == majority_)
+ if (readResponseResolver_.getMessageCount() == majority_)
{
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException,
DigestMismatchException
{
- readResponseResolver_.resolve(responses_);
+ readResponseResolver_.resolve();
}
};
// give remaining replicas until timeout to reply and get
added to responses_
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Fri Dec 3 18:52:06 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.service;
*/
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -29,6 +32,7 @@ import org.apache.cassandra.locator.IEnd
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -49,14 +53,14 @@ public class DatacenterQuorumResponseHan
@Override
public void response(Message message)
{
- responses.add(message); // we'll go ahead and resolve a reply from
anyone, even if it's not from this dc
+ resolver.preprocess(message);
int n;
n = localdc.equals(snitch.getDatacenter(message.getFrom()))
? localResponses.decrementAndGet()
: localResponses.get();
- if (n == 0 && responseResolver.isDataPresent(responses))
+ if (n == 0 && resolver.isDataPresent())
{
condition.signal();
}
@@ -68,4 +72,18 @@ public class DatacenterQuorumResponseHan
NetworkTopologyStrategy stategy = (NetworkTopologyStrategy)
Table.open(table).getReplicationStrategy();
return (stategy.getReplicationFactor(localdc) / 2) + 1;
}
+
+ @Override
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ int localEndpoints = 0;
+ for (InetAddress endpoint : endpoints)
+ {
+ if (localdc.equals(snitch.getDatacenter(endpoint)))
+ localEndpoints++;
+ }
+
+ if(localEndpoints < blockfor)
+ throw new UnavailableException();
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
Fri Dec 3 18:52:06 2010
@@ -34,8 +34,10 @@ public interface IResponseResolver<T> {
* repairs . Hence you need to derive a response resolver based on your
* needs from this interface.
*/
- public T resolve(Collection<Message> responses) throws
DigestMismatchException, IOException;
- public boolean isDataPresent(Collection<Message> responses);
+ public T resolve() throws DigestMismatchException, IOException;
+ public boolean isDataPresent();
public void preprocess(Message message);
+ public Iterable<Message> getMessages();
+ public int getMessageCount();
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Fri Dec 3 18:52:06 2010
@@ -18,11 +18,11 @@
package org.apache.cassandra.service;
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.io.IOException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Table;
@@ -30,8 +30,8 @@ import org.apache.cassandra.net.IAsyncCa
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.SimpleCondition;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,19 +39,20 @@ public class QuorumResponseHandler<T> im
{
protected static final Logger logger = LoggerFactory.getLogger(
QuorumResponseHandler.class );
protected final SimpleCondition condition = new SimpleCondition();
- protected final Collection<Message> responses = new
LinkedBlockingQueue<Message>();;
- protected IResponseResolver<T> responseResolver;
+ protected final IResponseResolver<T> resolver;
private final long startTime;
- protected int blockfor;
+ protected final int blockfor;
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public QuorumResponseHandler(IResponseResolver<T> responseResolver,
ConsistencyLevel consistencyLevel, String table)
+ public QuorumResponseHandler(IResponseResolver<T> resolver,
ConsistencyLevel consistencyLevel, String table)
{
this.blockfor = determineBlockFor(consistencyLevel, table);
- this.responseResolver = responseResolver;
+ this.resolver = resolver;
this.startTime = System.currentTimeMillis();
+
+ logger.debug("QuorumResponseHandler blocking for {} responses",
blockfor);
}
public T get() throws TimeoutException, DigestMismatchException,
IOException
@@ -72,35 +73,31 @@ public class QuorumResponseHandler<T> im
if (!success)
{
StringBuilder sb = new StringBuilder("");
- for (Message message : responses)
+ for (Message message : resolver.getMessages())
{
sb.append(message.getFrom());
}
- throw new TimeoutException("Operation timed out - received
only " + responses.size() + " responses from " + sb.toString() + " .");
+ throw new TimeoutException("Operation timed out - received
only " + resolver.getMessageCount() + " responses from " + sb.toString() + "
.");
}
}
finally
{
- for (Message response : responses)
+ for (Message response : resolver.getMessages())
{
MessagingService.removeRegisteredCallback(response.getMessageId());
}
}
- return responseResolver.resolve(responses);
+ return resolver.resolve();
}
public void response(Message message)
{
- responses.add(message);
- responseResolver.preprocess(message);
- if (responses.size() < blockfor) {
+ resolver.preprocess(message);
+ if (resolver.getMessageCount() < blockfor)
return;
- }
- if (responseResolver.isDataPresent(responses))
- {
+ if (resolver.isDataPresent())
condition.signal();
- }
}
public int determineBlockFor(ConsistencyLevel consistencyLevel, String
table)
@@ -115,7 +112,13 @@ public class QuorumResponseHandler<T> im
case ALL:
return
Table.open(table).getReplicationStrategy().getReplicationFactor();
default:
- throw new UnsupportedOperationException("invalid consistency
level: " + table.toString());
+ throw new UnsupportedOperationException("invalid consistency
level: " + consistencyLevel);
}
}
+
+ public void assureSufficientLiveNodes(Collection<InetAddress> endpoints)
throws UnavailableException
+ {
+ if (endpoints.size() < blockfor)
+ throw new UnavailableException();
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Fri Dec 3 18:52:06 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class RangeSliceResponseResolver
private static final Logger logger_ =
LoggerFactory.getLogger(RangeSliceResponseResolver.class);
private final String table;
private final List<InetAddress> sources;
+ protected final Collection<Message> responses = new
LinkedBlockingQueue<Message>();;
public RangeSliceResponseResolver(String table, List<InetAddress> sources)
{
@@ -53,7 +55,7 @@ public class RangeSliceResponseResolver
this.table = table;
}
- public List<Row> resolve(Collection<Message> responses) throws
DigestMismatchException, IOException
+ public List<Row> resolve() throws DigestMismatchException, IOException
{
CollatingIterator collator = new CollatingIterator(new
Comparator<Pair<Row,InetAddress>>()
{
@@ -110,11 +112,12 @@ public class RangeSliceResponseResolver
public void preprocess(Message message)
{
+ responses.add(message);
}
- public boolean isDataPresent(Collection<Message> responses)
+ public boolean isDataPresent()
{
- return responses.size() >= sources.size();
+ return !responses.isEmpty();
}
private static class RowIterator extends
AbstractIterator<Pair<Row,InetAddress>>
@@ -134,4 +137,14 @@ public class RangeSliceResponseResolver
return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(),
source) : endOfData();
}
}
+
+ public Iterable<Message> getMessages()
+ {
+ return responses;
+ }
+
+ public int getMessageCount()
+ {
+ return responses.size();
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Fri Dec 3 18:52:06 2010
@@ -58,14 +58,14 @@ public class ReadResponseResolver implem
* repair request should be scheduled.
*
*/
- public Row resolve(Collection<Message> responses) throws
DigestMismatchException, IOException
+ public Row resolve() throws DigestMismatchException, IOException
{
if (logger_.isDebugEnabled())
- logger_.debug("resolving " + responses.size() + " responses");
+ logger_.debug("resolving " + results.size() + " responses");
long startTime = System.currentTimeMillis();
- List<ColumnFamily> versions = new
ArrayList<ColumnFamily>(responses.size());
- List<InetAddress> endpoints = new
ArrayList<InetAddress>(responses.size());
+ List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+ List<InetAddress> endpoints = new ArrayList<InetAddress>();
DecoratedKey key = null;
ByteBuffer digest = FBUtilities.EMPTY_BYTE_BUFFER;
boolean isDigestQuery = false;
@@ -76,11 +76,10 @@ public class ReadResponseResolver implem
* query exists then we need to compare the digest with
* the digest of the data that is received.
*/
- for (Message message : responses)
- {
- ReadResponse result = results.get(message);
- if (result == null)
- continue; // arrived after quorum already achieved
+ for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+ {
+ ReadResponse result = entry.getValue();
+ Message message = entry.getKey();
if (result.isDigestQuery())
{
digest = result.digest();
@@ -187,6 +186,8 @@ public class ReadResponseResolver implem
try
{
ReadResponse result = ReadResponse.serializer().deserialize(new
DataInputStream(bufIn));
+ if (logger_.isDebugEnabled())
+ logger_.debug("Preprocessed {} response",
result.isDigestQuery() ? "digest" : "data");
results.put(message, result);
}
catch (IOException e)
@@ -201,16 +202,23 @@ public class ReadResponseResolver implem
results.put(message, result);
}
- public boolean isDataPresent(Collection<Message> responses)
+ public boolean isDataPresent()
{
- for (Message message : responses)
+ for (ReadResponse result : results.values())
{
- ReadResponse result = results.get(message);
- if (result == null)
- continue; // arrived concurrently
if (!result.isDigestQuery())
return true;
}
return false;
}
+
+ public Iterable<Message> getMessages()
+ {
+ return results.keySet();
+ }
+
+ public int getMessageCount()
+ {
+ return results.size();
+ }
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Dec 3 18:52:06 2010
@@ -314,7 +314,7 @@ public class StorageProxy implements Sto
private static List<Row> strongRead(List<ReadCommand> commands,
ConsistencyLevel consistency_level) throws IOException, UnavailableException,
TimeoutException
{
List<QuorumResponseHandler<Row>> quorumResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
- List<InetAddress[]> commandEndpoints = new ArrayList<InetAddress[]>();
+ List<List<InetAddress>> commandEndpoints = new
ArrayList<List<InetAddress>>();
List<Row> rows = new ArrayList<Row>();
// send out read requests
@@ -327,25 +327,25 @@ public class StorageProxy implements Sto
Message messageDigestOnly =
readMessageDigestOnly.makeReadMessage();
InetAddress dataPoint =
StorageService.instance.findSuitableEndpoint(command.table, command.key);
- List<InetAddress> endpointList =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+ List<InetAddress> endpoints =
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
- InetAddress[] endpoints = new InetAddress[endpointList.size()];
- Message messages[] = new Message[endpointList.size()];
+ AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
+ QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
+ handler.assureSufficientLiveNodes(endpoints);
+
+ Message messages[] = new Message[endpoints.size()];
// data-request message is sent to dataPoint, the node that will
actually get
// the data for us. The other replicas are only sent a digest
query.
int n = 0;
- for (InetAddress endpoint : endpointList)
+ for (InetAddress endpoint : endpoints)
{
Message m = endpoint.equals(dataPoint) ? message :
messageDigestOnly;
- endpoints[n] = endpoint;
messages[n++] = m;
if (logger.isDebugEnabled())
logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
- AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
- QuorumResponseHandler<Row> quorumResponseHandler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
- MessagingService.instance.sendRR(messages, endpoints,
quorumResponseHandler);
- quorumResponseHandlers.add(quorumResponseHandler);
+ MessagingService.instance.sendRR(messages, endpoints, handler);
+ quorumResponseHandlers.add(handler);
commandEndpoints.add(endpoints);
}
@@ -369,14 +369,14 @@ public class StorageProxy implements Sto
catch (DigestMismatchException ex)
{
AbstractReplicationStrategy rs =
Table.open(command.table).getReplicationStrategy();
- QuorumResponseHandler<Row> qrhRepair =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
ConsistencyLevel.QUORUM);
+ QuorumResponseHandler<Row> handler =
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
ConsistencyLevel.QUORUM);
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", ex);
Message messageRepair = command.makeReadMessage();
- MessagingService.instance.sendRR(messageRepair,
commandEndpoints.get(i), qrhRepair);
+ MessagingService.instance.sendRR(messageRepair,
commandEndpoints.get(i), handler);
if (repairResponseHandlers == null)
repairResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
- repairResponseHandlers.add(qrhRepair);
+ repairResponseHandlers.add(handler);
}
}
@@ -498,7 +498,7 @@ public class StorageProxy implements Sto
final Message msg = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
// an empty message acts as a request to the SchemaCheckVerbHandler.
- MessagingService.instance.sendRR(msg, liveHosts.toArray(new
InetAddress[]{}), new IAsyncCallback()
+ MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback()
{
public void response(Message msg)
{
@@ -775,7 +775,7 @@ public class StorageProxy implements Sto
logger.debug("Starting to send truncate messages to hosts {}",
allEndpoints);
Truncation truncation = new Truncation(keyspace, cfname);
Message message = truncation.makeTruncationMessage();
- MessagingService.instance.sendRR(message, allEndpoints.toArray(new
InetAddress[]{}), responseHandler);
+ MessagingService.instance.sendRR(message, allEndpoints,
responseHandler);
// Wait for all
logger.debug("Sent all truncate messages, now waiting for {}
responses", blockFor);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
Fri Dec 3 18:52:06 2010
@@ -138,6 +138,7 @@ public class CassandraServer implements
}
catch (TimeoutException e)
{
+ logger.debug("... timed out");
throw new TimedOutException();
}
catch (IOException e)
@@ -442,11 +443,12 @@ public class CassandraServer implements
try
{
- StorageProxy.mutate(mutations, consistency_level);
+ StorageProxy.mutate(mutations, consistency_level);
}
catch (TimeoutException e)
{
- throw new TimedOutException();
+ logger.debug("... timed out");
+ throw new TimedOutException();
}
}
finally
@@ -512,6 +514,7 @@ public class CassandraServer implements
}
catch (TimeoutException e)
{
+ logger.debug("... timed out");
throw new TimedOutException();
}
catch (IOException e)
@@ -556,6 +559,7 @@ public class CassandraServer implements
}
catch (TimeoutException e)
{
+ logger.debug("... timed out");
throw new TimedOutException();
}
return thriftifyKeySlices(rows, column_parent, column_predicate);
Added:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1041951&view=auto
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
(added)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Fri Dec 3 18:52:06 2010
@@ -0,0 +1,145 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.HashMultimap;
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsistencyLevelTest extends CleanupHelper
+{
+ @Test
+ public void testReadWriteConsistencyChecks() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 3;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ IPartitioner partitioner = new RandomPartitioner();
+
+ ss.setPartitionerUnsafe(partitioner);
+
+ ArrayList<Token> endpointTokens = new ArrayList<Token>();
+ ArrayList<Token> keyTokens = new ArrayList<Token>();
+ List<InetAddress> hosts = new ArrayList<InetAddress>();
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, RING_SIZE);
+
+ HashMultimap<InetAddress, InetAddress> hintedNodes =
HashMultimap.create();
+
+
+ AbstractReplicationStrategy strategy;
+
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ {
+ strategy = getStrategy(table, tmd);
+ StorageService.calculatePendingRanges(strategy, table);
+ int replicationFactor = strategy.getReplicationFactor();
+ if (replicationFactor < 2)
+ continue;
+
+ for (ConsistencyLevel c : ConsistencyLevel.values())
+ {
+
+ if (c == ConsistencyLevel.EACH_QUORUM || c ==
ConsistencyLevel.LOCAL_QUORUM)
+ continue;
+
+ for (int i = 0; i < replicationFactor; i++)
+ {
+ hintedNodes.clear();
+
+ for (int j = 0; j < i; j++)
+ {
+ hintedNodes.put(hosts.get(j), hosts.get(j));
+ }
+
+ IWriteResponseHandler writeHandler =
strategy.getWriteResponseHandler(hosts, hintedNodes, c);
+
+ QuorumResponseHandler<Row> readHandler =
strategy.getQuorumResponseHandler(new ReadResponseResolver(table), c);
+
+ boolean isWriteUnavailable = false;
+ boolean isReadUnavailable = false;
+ try
+ {
+ writeHandler.assureSufficientLiveNodes();
+ }
+ catch (UnavailableException e)
+ {
+ isWriteUnavailable = true;
+ }
+
+ try
+ {
+
readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+ }
+ catch (UnavailableException e)
+ {
+ isReadUnavailable = true;
+ }
+
+ //these should always match (in this kind of test)
+ assertTrue(isWriteUnavailable == isReadUnavailable);
+
+ switch (c)
+ {
+ case ALL:
+ if (isWriteUnavailable)
+ assertTrue(hintedNodes.size() <
replicationFactor);
+ else
+ assertTrue(hintedNodes.size() >=
replicationFactor);
+
+ break;
+ case ONE:
+ case ANY:
+ if (isWriteUnavailable)
+ assertTrue(hintedNodes.size() == 0);
+ else
+ assertTrue(hintedNodes.size() > 0);
+ break;
+ case QUORUM:
+ if (isWriteUnavailable)
+ assertTrue(hintedNodes.size() <
(replicationFactor / 2 + 1));
+ else
+ assertTrue(hintedNodes.size() >=
(replicationFactor / 2 + 1));
+ break;
+ default:
+ fail("Unhandled CL: " + c);
+
+ }
+ }
+ }
+ return;
+ }
+
+ fail("Test requires at least one table with RF > 1");
+ }
+
+ private AbstractReplicationStrategy getStrategy(String table,
TokenMetadata tmd) throws ConfigurationException
+ {
+ return AbstractReplicationStrategy.createReplicationStrategy(table,
+
"org.apache.cassandra.locator.SimpleStrategy",
+ tmd,
+ new
SimpleSnitch(),
+ null);
+ }
+
+}