Author: jbellis
Date: Mon Aug 24 16:49:21 2009
New Revision: 807305

URL: http://svn.apache.org/viewvc?rev=807305&view=rev
Log:
allow blocking write to create hints if not enough of the "correct" nodes are 
live, but enough are to fulfil the requested consistency level.  patch by 
Sandeep Tata; reviewed by jbellis and Michael Greene for CASSANDRA-383

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=807305&r1=807304&r2=807305&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Mon Aug 24 16:49:21 2009
@@ -33,7 +33,6 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.TimedStatsDeque;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata;
 
 import org.apache.log4j.Logger;
@@ -162,35 +161,31 @@
         }
         try
         {
-            EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(rm.key());
-            if (endpoints.length < (DatabaseDescriptor.getReplicationFactor() 
/ 2) + 1)
+            Map<EndPoint, EndPoint> endpointMap = 
StorageService.instance().getNStorageEndPointMap(rm.key());
+            int blockFor = determineBlockFor(consistency_level);
+            List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+            if (primaryNodes.size() < blockFor) // guarantee blockFor = W live 
nodes.
             {
                 throw new UnavailableException();
             }
-            int blockFor;
-            if (consistency_level == ConsistencyLevel.ONE)
-            {
-                blockFor = 1;
-            }
-            else if (consistency_level == ConsistencyLevel.QUORUM)
-            {
-                blockFor = (DatabaseDescriptor.getReplicationFactor() >> 1) + 
1;
-            }
-            else if (consistency_level == ConsistencyLevel.ALL)
-            {
-                blockFor = DatabaseDescriptor.getReplicationFactor();
-            }
-            else
-            {
-                throw new UnsupportedOperationException("invalid consistency 
level " + consistency_level);
-            }
             QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
             if (logger.isDebugEnabled())
-                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
+                logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") 
+ "]");
 
-            MessagingService.getMessagingInstance().sendRR(message, endpoints, 
quorumResponseHandler);
+            // Get all the targets and stick them in an array
+            MessagingService.getMessagingInstance().sendRR(message, 
primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
             if (!quorumResponseHandler.get())
                 throw new UnavailableException();
+            if (primaryNodes.size() < endpointMap.size()) // Do we need to 
bother with Hinted Handoff?
+            {
+                for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+                {
+                    if (e.getKey() != e.getValue()) // Hinted Handoff to target
+                    {
+                        
MessagingService.getMessagingInstance().sendOneWay(message, e.getKey());
+                    }
+                }
+            }
         }
         catch (Exception e)
         {
@@ -203,6 +198,41 @@
         }
     }
 
+    private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> 
endpointMap)
+    {
+        List<EndPoint> liveEndPoints = new 
ArrayList<EndPoint>(endpointMap.size());
+        for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+        {
+            if (e.getKey() == e.getValue())
+            {
+                liveEndPoints.add(e.getKey());
+            }
+        }
+        return liveEndPoints;
+    }
+
+    private static int determineBlockFor(int consistency_level)
+    {
+        int blockFor;
+        if (consistency_level == ConsistencyLevel.ONE)
+        {
+            blockFor = 1;
+        }
+        else if (consistency_level == ConsistencyLevel.QUORUM)
+        {
+            blockFor = (DatabaseDescriptor.getReplicationFactor() / 2) + 1;
+        }
+        else if (consistency_level == ConsistencyLevel.ALL)
+        {
+            blockFor = DatabaseDescriptor.getReplicationFactor();
+        }
+        else
+        {
+            throw new UnsupportedOperationException("invalid consistency level 
" + consistency_level);
+        }
+        return blockFor;
+    }
+
     public static void insertBlocking(RowMutation rm) throws 
UnavailableException
     {
         insertBlocking(rm, ConsistencyLevel.QUORUM);


Reply via email to