Author: jbellis
Date: Mon Nov 9 18:15:46 2009
New Revision: 834164
URL: http://svn.apache.org/viewvc?rev=834164&view=rev
Log:
check for live-ness and throw UnavailableException if quorum is unsatisfyable.
patch by jbellis; reviewed by Ray Slakinski for CASSANDRA-441
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=834164&r1=834163&r2=834164&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Nov 9 18:15:46 2009
@@ -365,35 +365,32 @@
assert !command.isDigestQuery();
ReadCommand readMessageDigestOnly = command.copy();
readMessageDigestOnly.setDigestQuery(true);
-
Message message = command.makeReadMessage();
Message messageDigestOnly =
readMessageDigestOnly.makeReadMessage();
- QuorumResponseHandler<Row> quorumResponseHandler =
StorageService.instance().getResponseHandler(new ReadResponseResolver(),
DatabaseDescriptor.getQuorum(), consistency_level);
InetAddress dataPoint =
StorageService.instance().findSuitableEndPoint(command.key);
List<InetAddress> endpointList =
StorageService.instance().getNaturalEndpoints(command.key);
- /* Remove the local storage endpoint from the list. */
- endpointList.remove(dataPoint);
- InetAddress[] endPoints = new InetAddress[endpointList.size() + 1];
- Message messages[] = new Message[endpointList.size() + 1];
+ InetAddress[] endPoints = new InetAddress[endpointList.size()];
+ Message messages[] = new Message[endpointList.size()];
/*
- * First message is sent to the node that will actually get
- * the data for us. The other two replicas are only sent a
- * digest query.
+ * data-request message is sent to dataPoint, the node that will
actually get
+ * the data for us. The other replicas are only sent a digest
query.
*/
- endPoints[0] = dataPoint;
- messages[0] = message;
- if (logger.isDebugEnabled())
- logger.debug("strongread reading data for " + command + " from
" + message.getMessageId() + "@" + dataPoint);
- for (int i = 1; i < endPoints.length; i++)
+ int n = 0;
+ for (InetAddress endpoint : endpointList)
{
- InetAddress digestPoint = endpointList.get(i - 1);
- endPoints[i] = digestPoint;
- messages[i] = messageDigestOnly;
+ if (!FailureDetector.instance().isAlive(endpoint))
+ continue;
+ Message m = endpoint.equals(dataPoint) ? message :
messageDigestOnly;
+ endPoints[n] = endpoint;
+ messages[n++] = m;
if (logger.isDebugEnabled())
- logger.debug("strongread reading digest for " + command +
" from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+ logger.debug("strongread reading " + (m == message ?
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" +
endpoint);
}
+ if (n < DatabaseDescriptor.getQuorum())
+ throw new UnavailableException();
+ QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new
ReadResponseResolver());
MessagingService.instance().sendRR(messages, endPoints,
quorumResponseHandler);
quorumResponseHandlers.add(quorumResponseHandler);
commandEndPoints.add(endPoints);