Author: jbellis
Date: Mon Aug 24 16:49:21 2009
New Revision: 807305
URL: http://svn.apache.org/viewvc?rev=807305&view=rev
Log:
allow blocking write to create hints if not enough of the "correct" nodes are
live, but enough are to fulfil the requested consistency level. patch by
Sandeep Tata; reviewed by jbellis and Michael Greene for CASSANDRA-383
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=807305&r1=807304&r2=807305&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Aug 24 16:49:21 2009
@@ -33,7 +33,6 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.TimedStatsDeque;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.log4j.Logger;
@@ -162,35 +161,31 @@
}
try
{
- EndPoint[] endpoints =
StorageService.instance().getNStorageEndPoint(rm.key());
- if (endpoints.length < (DatabaseDescriptor.getReplicationFactor()
/ 2) + 1)
+ Map<EndPoint, EndPoint> endpointMap =
StorageService.instance().getNStorageEndPointMap(rm.key());
+ int blockFor = determineBlockFor(consistency_level);
+ List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+ if (primaryNodes.size() < blockFor) // guarantee blockFor = W live
nodes.
{
throw new UnavailableException();
}
- int blockFor;
- if (consistency_level == ConsistencyLevel.ONE)
- {
- blockFor = 1;
- }
- else if (consistency_level == ConsistencyLevel.QUORUM)
- {
- blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) +
1;
- }
- else if (consistency_level == ConsistencyLevel.ALL)
- {
- blockFor = DatabaseDescriptor.getReplicationFactor();
- }
- else
- {
- throw new UnsupportedOperationException("invalid consistency
level " + consistency_level);
- }
QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
if (logger.isDebugEnabled())
- logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
+ logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ")
+ "]");
- MessagingService.getMessagingInstance().sendRR(message, endpoints,
quorumResponseHandler);
+ // Get all the targets and stick them in an array
+ MessagingService.getMessagingInstance().sendRR(message,
primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
if (!quorumResponseHandler.get())
throw new UnavailableException();
+ if (primaryNodes.size() < endpointMap.size()) // Do we need to
bother with Hinted Handoff?
+ {
+ for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ {
+ if (e.getKey() != e.getValue()) // Hinted Handoff to target
+ {
+
MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
+ }
+ }
+ }
}
catch (Exception e)
{
@@ -203,6 +198,41 @@
}
}
+ private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint>
endpointMap)
+ {
+ List<EndPoint> liveEndPoints = new
ArrayList<EndPoint>(endpointMap.size());
+ for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+ {
+ if (e.getKey() == e.getValue())
+ {
+ liveEndPoints.add(e.getKey());
+ }
+ }
+ return liveEndPoints;
+ }
+
+ private static int determineBlockFor(int consistency_level)
+ {
+ int blockFor;
+ if (consistency_level == ConsistencyLevel.ONE)
+ {
+ blockFor = 1;
+ }
+ else if (consistency_level == ConsistencyLevel.QUORUM)
+ {
+ blockFor = (DatabaseDescriptor.getReplicationFactor() / 2) + 1;
+ }
+ else if (consistency_level == ConsistencyLevel.ALL)
+ {
+ blockFor = DatabaseDescriptor.getReplicationFactor();
+ }
+ else
+ {
+ throw new UnsupportedOperationException("invalid consistency level
" + consistency_level);
+ }
+ return blockFor;
+ }
+
public static void insertBlocking(RowMutation rm) throws
UnavailableException
{
insertBlocking(rm, ConsistencyLevel.QUORUM);