Author: jbellis
Date: Fri Oct 30 18:27:44 2009
New Revision: 831414

URL: http://svn.apache.org/viewvc?rev=831414&view=rev
Log:
fix hinted handoff map computation
patch by jbellis; tested by Edmond Lau for CASSANDRA-524

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Fri Oct 30 18:27:44 2009
@@ -117,12 +117,19 @@
         return endpoints;
     }
 
-    private Map<InetAddress, InetAddress> 
getHintedMapForEndpoints(Iterable<InetAddress> topN)
+    /**
+     * returns map of {ultimate target: destination}, where if destination is 
not the same
+     * as the ultimate target, it is a "hinted" node, a node that will deliver 
the data to
+     * the ultimate target when it becomes alive again.
+     *
+     * A destination node may be the destination for multiple targets.
+     */
+    private Map<InetAddress, InetAddress> 
getHintedMapForEndpoints(Collection<InetAddress> targets)
     {
         Set<InetAddress> usedEndpoints = new HashSet<InetAddress>();
         Map<InetAddress, InetAddress> map = new HashMap<InetAddress, 
InetAddress>();
 
-        for (InetAddress ep : topN)
+        for (InetAddress ep : targets)
         {
             if (FailureDetector.instance().isAlive(ep))
             {
@@ -149,7 +156,7 @@
                 for (int i = startIndex, count = 1; count < totalNodes; 
++count, i = (i + 1) % totalNodes)
                 {
                     InetAddress tmpEndPoint = 
tokenToEndPointMap.get(tokens.get(i));
-                    if (FailureDetector.instance().isAlive(tmpEndPoint) && 
!Arrays.asList(topN).contains(tmpEndPoint) && 
!usedEndpoints.contains(tmpEndPoint))
+                    if (FailureDetector.instance().isAlive(tmpEndPoint) && 
!targets.contains(tmpEndPoint) && !usedEndpoints.contains(tmpEndPoint))
                     {
                         hintLocation = tmpEndPoint;
                         break;
@@ -159,10 +166,12 @@
                 if (hintLocation == null)
                     hintLocation = FBUtilities.getLocalAddress();
 
-                map.put(hintLocation, ep);
+                map.put(ep, hintLocation);
                 usedEndpoints.add(hintLocation);
             }
         }
+
+        assert map.size() == targets.size();
         return map;
     }
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Fri Oct 30 18:27:44 2009
@@ -46,10 +46,9 @@
 
     public QuorumResponseHandler(int responseCount, IResponseResolver<T> 
responseResolver) throws InvalidRequestException
     {
-        if (responseCount > DatabaseDescriptor.getReplicationFactor())
-            throw new InvalidRequestException("Cannot block for more than the 
replication factor of " + DatabaseDescriptor.getReplicationFactor());
-        if (responseCount < 1)
-            throw new InvalidRequestException("Cannot block for less than one 
replica");
+        assert 1 <= responseCount && responseCount <= 
DatabaseDescriptor.getReplicationFactor()
+            : "invalid response count " + responseCount;
+
         responseCount_ = responseCount;
         responseResolver_ =  responseResolver;
         startTime_ = System.currentTimeMillis();

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=831414&r1=831413&r2=831414&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Fri Oct 30 18:27:44 2009
@@ -78,18 +78,18 @@
         for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
         {
             InetAddress target = entry.getKey();
-            InetAddress hint = entry.getValue();
-            if ( !target.equals(hint) )
+            InetAddress hintedTarget = entry.getValue();
+            if (target.equals(hintedTarget))
             {
-                Message hintedMessage = rm.makeRowMutationMessage();
-                hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
-                if (logger.isDebugEnabled())
-                    logger.debug("Sending the hint of " + hint + " to " + 
target);
-                messageMap.put(target, hintedMessage);
+                messageMap.put(target, message);
             }
             else
             {
-                messageMap.put(target, message);
+                Message hintedMessage = rm.makeRowMutationMessage();
+                hintedMessage.addHeader(RowMutation.HINT, 
hintedTarget.getAddress());
+                if (logger.isDebugEnabled())
+                    logger.debug("Sending the hint of " + hintedTarget + " to 
" + target);
+                messageMap.put(hintedTarget, hintedMessage);
             }
         }
         return messageMap;
@@ -161,7 +161,7 @@
             }
             QuorumResponseHandler<Boolean> quorumResponseHandler = 
StorageService.instance().getResponseHandler(new WriteResponseResolver(), 
blockFor, consistency_level);
             if (logger.isDebugEnabled())
-                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") 
+ "]");
+                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.values(), ", ") 
+ "]");
 
             // Get all the targets and stick them in an array
             MessagingService.instance().sendRR(message, 
primaryNodes.toArray(new InetAddress[primaryNodes.size()]), 
quorumResponseHandler);
@@ -173,7 +173,7 @@
                 {
                     if (e.getKey() != e.getValue()) // Hinted Handoff to target
                     {
-                        MessagingService.instance().sendOneWay(message, 
e.getKey());
+                        MessagingService.instance().sendOneWay(message, 
e.getValue());
                     }
                 }
             }
@@ -194,7 +194,7 @@
         List<InetAddress> liveEndPoints = new 
ArrayList<InetAddress>(endpointMap.size());
         for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
         {
-            if (e.getKey() == e.getValue())
+            if (e.getKey().equals(e.getValue()))
             {
                 liveEndPoints.add(e.getKey());
             }
@@ -204,6 +204,9 @@
 
     private static int determineBlockFor(int naturalTargets, int 
hintedTargets, int consistency_level)
     {
+        assert naturalTargets >= 1;
+        assert hintedTargets >= naturalTargets;
+
         int bootstrapTargets = hintedTargets - naturalTargets;
         int blockFor;
         if (consistency_level == ConsistencyLevel.ONE)


Reply via email to