Author: jbellis
Date: Fri Oct 30 18:27:44 2009
New Revision: 831414
URL: http://svn.apache.org/viewvc?rev=831414&view=rev
Log:
fix hinted handoff map computation
patch by jbellis; tested by Edmond Lau for CASSANDRA-524
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Fri Oct 30 18:27:44 2009
@@ -117,12 +117,19 @@
return endpoints;
}
- private Map<InetAddress, InetAddress>
getHintedMapForEndpoints(Iterable<InetAddress> topN)
+ /**
+ * returns map of {ultimate target: destination}, where if destination is
not the same
+ * as the ultimate target, it is a "hinted" node, a node that will deliver
the data to
+ * the ultimate target when it becomes alive again.
+ *
+ * A destination node may be the destination for multiple targets.
+ */
+ private Map<InetAddress, InetAddress>
getHintedMapForEndpoints(Collection<InetAddress> targets)
{
Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
Map<InetAddress, InetAddress> map = new HashMap<InetAddress,
InetAddress>();
- for (InetAddress ep : topN)
+ for (InetAddress ep : targets)
{
if (FailureDetector.instance().isAlive(ep))
{
@@ -149,7 +156,7 @@
for (int i = startIndex, count = 1; count < totalNodes;
++count, i = (i + 1) % totalNodes)
{
InetAddress tmpEndPoint =
tokenToEndPointMap.get(tokens.get(i));
- if (FailureDetector.instance().isAlive(tmpEndPoint) &&
!Arrays.asList(topN).contains(tmpEndPoint) &&
!usedEndpoints.contains(tmpEndPoint))
+ if (FailureDetector.instance().isAlive(tmpEndPoint) &&
!targets.contains(tmpEndPoint) && !usedEndpoints.contains(tmpEndPoint))
{
hintLocation = tmpEndPoint;
break;
@@ -159,10 +166,12 @@
if (hintLocation == null)
hintLocation = FBUtilities.getLocalAddress();
- map.put(hintLocation, ep);
+ map.put(ep, hintLocation);
usedEndpoints.add(hintLocation);
}
}
+
+ assert map.size() == targets.size();
return map;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Fri Oct 30 18:27:44 2009
@@ -46,10 +46,9 @@
public QuorumResponseHandler(int responseCount, IResponseResolver<T>
responseResolver) throws InvalidRequestException
{
- if (responseCount > DatabaseDescriptor.getReplicationFactor())
- throw new InvalidRequestException("Cannot block for more than the
replication factor of " + DatabaseDescriptor.getReplicationFactor());
- if (responseCount < 1)
- throw new InvalidRequestException("Cannot block for less than one
replica");
+ assert 1 <= responseCount && responseCount <=
DatabaseDescriptor.getReplicationFactor()
+ : "invalid response count " + responseCount;
+
responseCount_ = responseCount;
responseResolver_ = responseResolver;
startTime_ = System.currentTimeMillis();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Oct 30 18:27:44 2009
@@ -78,18 +78,18 @@
for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
{
InetAddress target = entry.getKey();
- InetAddress hint = entry.getValue();
- if ( !target.equals(hint) )
+ InetAddress hintedTarget = entry.getValue();
+ if (target.equals(hintedTarget))
{
- Message hintedMessage = rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
- if (logger.isDebugEnabled())
- logger.debug("Sending the hint of " + hint + " to " +
target);
- messageMap.put(target, hintedMessage);
+ messageMap.put(target, message);
}
else
{
- messageMap.put(target, message);
+ Message hintedMessage = rm.makeRowMutationMessage();
+ hintedMessage.addHeader(RowMutation.HINT,
hintedTarget.getAddress());
+ if (logger.isDebugEnabled())
+ logger.debug("Sending the hint of " + hintedTarget + " to
" + target);
+ messageMap.put(hintedTarget, hintedMessage);
}
}
return messageMap;
@@ -161,7 +161,7 @@
}
QuorumResponseHandler<Boolean> quorumResponseHandler =
StorageService.instance().getResponseHandler(new WriteResponseResolver(),
blockFor, consistency_level);
if (logger.isDebugEnabled())
- logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ")
+ "]");
+ logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.values(), ", ")
+ "]");
// Get all the targets and stick them in an array
MessagingService.instance().sendRR(message,
primaryNodes.toArray(new InetAddress[primaryNodes.size()]),
quorumResponseHandler);
@@ -173,7 +173,7 @@
{
if (e.getKey() != e.getValue()) // Hinted Handoff to target
{
- MessagingService.instance().sendOneWay(message,
e.getKey());
+ MessagingService.instance().sendOneWay(message,
e.getValue());
}
}
}
@@ -194,7 +194,7 @@
List<InetAddress> liveEndPoints = new
ArrayList<InetAddress>(endpointMap.size());
for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
{
- if (e.getKey() == e.getValue())
+ if (e.getKey().equals(e.getValue()))
{
liveEndPoints.add(e.getKey());
}
@@ -204,6 +204,9 @@
private static int determineBlockFor(int naturalTargets, int
hintedTargets, int consistency_level)
{
+ assert naturalTargets >= 1;
+ assert hintedTargets >= naturalTargets;
+
int bootstrapTargets = hintedTargets - naturalTargets;
int blockFor;
if (consistency_level == ConsistencyLevel.ONE)