Author: jbellis
Date: Mon May 10 18:52:38 2010
New Revision: 942840

URL: http://svn.apache.org/viewvc?rev=942840&view=rev
Log:
vijay

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Modified:
    cassandra/trunk/conf/datacenters.properties
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
    
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java

Modified: cassandra/trunk/conf/datacenters.properties
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/datacenters.properties?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/conf/datacenters.properties (original)
+++ cassandra/trunk/conf/datacenters.properties Mon May 10 18:52:38 2010
@@ -14,9 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# datacenter=replication factor
 # The sum of all the datacenter replication factor values should equal
 # the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
-dc1=3
-dc2=5
-dc3=1
+
+# keyspace\:datacenter=replication factor
+Keyspace1\:dc1=3
+Keyspace1\:dc2=5
+keyspace1\:dc3=1

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
Mon May 10 18:52:38 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 
 import java.util.Collection;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.io.IOException;
@@ -37,15 +38,20 @@ import java.net.InetAddress;
 
 import org.apache.commons.lang.ArrayUtils;
 
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.FBUtilities;
 import static org.apache.cassandra.utils.FBUtilities.UTF8;
 import org.apache.cassandra.utils.WrappedRunnable;
 
+import com.google.common.collect.Multimap;
+
 
 /**
  * For each table (keyspace), there is a row in the system hints CF.
@@ -104,7 +110,7 @@ public class HintedHandOffManager
         }, "Hint delivery").start();
     }
 
-    private static boolean sendMessage(InetAddress endpoint, String tableName, 
byte[] key) throws IOException
+    private static boolean sendMessage(InetAddress endpoint, String tableName, 
byte[] key) throws IOException, UnavailableException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
         {
@@ -126,8 +132,12 @@ public class HintedHandOffManager
                 rm.add(cf);
         }
         Message message = rm.makeRowMutationMessage();
-        WriteResponseHandler responseHandler = new WriteResponseHandler(1, 
tableName);
-        MessagingService.instance.sendRR(message, new InetAddress[] { endpoint 
}, responseHandler);
+        InetAddress [] endpoints = new InetAddress[] { endpoint };
+        AbstractReplicationStrategy rs = 
StorageService.instance.getReplicationStrategy(tableName);
+        List<InetAddress> endpointlist = Arrays.asList(endpoints);
+        Multimap<InetAddress, InetAddress> hintedEndpoints = 
rs.getHintedEndpoints(endpointlist);
+        WriteResponseHandler responseHandler = new 
WriteResponseHandler(endpointlist, hintedEndpoints, ConsistencyLevel.ALL, 
tableName);
+        MessagingService.instance.sendRR(message, endpoints, responseHandler);
 
         try
         {
@@ -154,8 +164,9 @@ public class HintedHandOffManager
         rm.apply();
     }
 
-    /** hintStore must be the hints columnfamily from the system table */
-    private static void deliverAllHints() throws DigestMismatchException, 
IOException, InvalidRequestException, TimeoutException
+    /** hintStore must be the hints columnfamily from the system table 
+     * @throws UnavailableException */
+    private static void deliverAllHints() throws DigestMismatchException, 
IOException, InvalidRequestException, TimeoutException, UnavailableException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started deliverAllHints");
@@ -223,7 +234,7 @@ public class HintedHandOffManager
                || (hintColumnFamily.getSortedColumns().size() == 1 && 
hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    private static void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, TimeoutException
+    private static void deliverHintsToEndpoint(InetAddress endpoint) throws 
IOException, DigestMismatchException, InvalidRequestException, 
TimeoutException, UnavailableException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started hinted handoff for endpoint " + endpoint);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Mon May 10 18:52:38 2010
@@ -27,11 +27,15 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -41,7 +45,7 @@ public abstract class AbstractReplicatio
 {
     protected static final Logger logger_ = 
LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    private TokenMetadata tokenMetadata_;
+    protected TokenMetadata tokenMetadata_;
     protected final IEndpointSnitch snitch_;
 
     AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch 
snitch)
@@ -190,4 +194,8 @@ public abstract class AbstractReplicatio
         return getAddressRanges(temp, table).get(pendingAddress);
     }
 
+    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver 
responseResolver, ConsistencyLevel consistencyLevel, String table)
+    {
+        return new QuorumResponseHandler(responseResolver, consistencyLevel, 
table);
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
 Mon May 10 18:52:38 2010
@@ -150,37 +150,36 @@ public class DatacenterShardStrategy ext
         }
     }
 
-    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token 
searchToken, TokenMetadata metadata) throws IOException
+    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token 
searchToken, TokenMetadata metadata, String table) throws UnknownHostException
     {
         ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
 
         if (metadata.sortedTokens().size() == 0)
             return endpoints;
 
-        if (null == tokens || tokens.size() != metadata.sortedTokens().size())
+        if (tokensize != metadata.sortedTokens().size())
         {
-            loadEndpoints(metadata);
+            loadEndPoints(metadata);
         }
 
-        for (String dc : dcMap.keySet())
+        for (String dc : dcTokens.keySet())
         {
-            int replicas_ = dcReplicationFactor.get(dc);
-            ArrayList<InetAddress> forloopReturn = new 
ArrayList<InetAddress>(replicas_);
-            List<Token> tokens = dcMap.get(dc);
+            int replicas_ = getReplicationFactor(dc, table);
+            List<Token> tokens = dcTokens.get(dc);
             boolean bOtherRack = false;
             boolean doneDataCenterItr;
             // Add the node at the index by default
             Iterator<Token> iter = TokenMetadata.ringIterator(tokens, 
searchToken);
             InetAddress primaryHost = metadata.getEndpoint(iter.next());
-            forloopReturn.add(primaryHost);
+            endpoints.add(primaryHost);
 
-            while (forloopReturn.size() < replicas_ && iter.hasNext())
+            while (endpoints.size() < replicas_ && iter.hasNext())
             {
                 Token t = iter.next();
-                InetAddress endpointOfInterest = metadata.getEndpoint(t);
-                if (forloopReturn.size() < replicas_ - 1)
+                InetAddress endPointOfInterest = metadata.getEndpoint(t);
+                if (endpoints.size() < replicas_ - 1)
                 {
-                    forloopReturn.add(endpointOfInterest);
+                       endpoints.add(endPointOfInterest);
                     continue;
                 }
                 else
@@ -191,10 +190,9 @@ public class DatacenterShardStrategy ext
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
-                    AbstractRackAwareSnitch snitch = 
(AbstractRackAwareSnitch)snitch_;
-                    if 
(!snitch.getRack(primaryHost).equals(snitch.getRack(endpointOfInterest)))
+                    if 
(!snitch.getRack(primaryHost).equals(snitch.getRack(endPointOfInterest)))
                     {
-                        forloopReturn.add(metadata.getEndpoint(t));
+                       endpoints.add(metadata.getEndpoint(t));
                         bOtherRack = true;
                     }
                 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java 
Mon May 10 18:52:38 2010
@@ -19,8 +19,6 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
 
 /**
  * A simple endpoint snitch implementation that assumes datacenter and rack 
information is encoded
@@ -28,12 +26,12 @@ import java.util.*;
  */
 public class RackInferringSnitch extends AbstractRackAwareSnitch
 {
-    public String getRack(InetAddress endpoint) throws UnknownHostException
+    public String getRack(InetAddress endpoint)
     {
         return Byte.toString(endpoint.getAddress()[2]);
     }
 
-    public String getDatacenter(InetAddress endpoint) throws 
UnknownHostException
+    public String getDatacenter(InetAddress endpoint)
     {
         return Byte.toString(endpoint.getAddress()[1]);
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
Mon May 10 18:52:38 2010
@@ -112,7 +112,7 @@ class ConsistencyChecker implements Runn
 
                 if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_, replicas_.size());
+                    IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver(table_);
                     IAsyncCallback responseHandler;
                     if (replicas_.contains(FBUtilities.getLocalAddress()))
                         responseHandler = new DataRepairHandler(row_, 
replicas_.size(), readResponseResolver);

Added: 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=942840&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 Mon May 10 18:52:38 2010
@@ -0,0 +1,63 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.locator.RackInferringSnitch;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler will make sure to that all the responses
+ * recived are from the same dc and will validate it with the
+ * 
+ * @author Vijay Parthasarathy
+ */
+public class DatacenterQuorumResponseHandler<T> extends 
QuorumResponseHandler<T>
+{
+    private static final RackInferringSnitch snitch = (RackInferringSnitch) 
DatabaseDescriptor.getEndpointSnitch();
+       private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private AtomicInteger localResponses;
+    
+    public DatacenterQuorumResponseHandler(IResponseResolver<T> 
responseResolver, ConsistencyLevel consistencyLevel, String table)
+    {
+        super(responseResolver, consistencyLevel, table);
+        localResponses = new AtomicInteger(blockfor);
+    }
+    
+    @Override
+    public void response(Message message)
+    {
+        try
+        {
+            int b = -1;
+            responses.add(message);
+            // If DCQuorum/DCQuorumSync, check if the response is from the 
local DC.
+            if (localdc.equals(snitch.getDatacenter(message.getFrom())))
+            {
+                b = localResponses.decrementAndGet();
+            } else {
+               b = localResponses.get();
+            }
+            if (b == 0 && responseResolver.isDataPresent(responses))
+            {
+                condition.signal();
+            }
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    @Override
+    public int determineBlockFor(ConsistencyLevel consistency_level, String 
table)
+       {
+               DatacenterShardStrategy stategy = (DatacenterShardStrategy) 
StorageService.instance.getReplicationStrategy(table);
+               return (stategy.getReplicationFactor(localdc, table)/2) + 1;
+       }
+}
\ No newline at end of file

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
 Mon May 10 18:52:38 2010
@@ -24,13 +24,19 @@ package org.apache.cassandra.service;
  */
 
 
+import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import com.google.common.collect.Multimap;
 
 /**
  * This class will block for the replication factor which is
@@ -39,58 +45,85 @@ import org.apache.cassandra.net.Message;
  */
 public class DatacenterSyncWriteResponseHandler extends WriteResponseHandler
 {
-    private final Map<String, Integer> dcResponses = new HashMap<String, 
Integer>();
-    private final Map<String, Integer> responseCounts;
-    private final AbstractRackAwareSnitch endpointSnitch;
+       private final DatacenterShardStrategy stategy = 
(DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
+    private HashMap<String, AtomicInteger> dcResponses;
 
-    public DatacenterSyncWriteResponseHandler(Map<String, Integer> 
responseCounts, String table)
+    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> 
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, 
ConsistencyLevel consistencyLevel, String table)
+    throws UnavailableException
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(1, table);
-        this.responseCounts = responseCounts;
-        endpointSnitch = (AbstractRackAwareSnitch) 
DatabaseDescriptor.getEndpointSnitch();
+        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
     }
 
-    @Override
     // synchronized for the benefit of dcResponses and responseCounts.  
"responses" itself
     // is inherited from WRH and is concurrent.
-    // TODO can we use concurrent structures instead?
-    public synchronized void response(Message message)
+    @Override
+    public void response(Message message)
     {
+        responses.add(message);
         try
         {
-            String dataCenter = 
endpointSnitch.getDatacenter(message.getFrom());
-            Object blockFor = responseCounts.get(dataCenter);
+            String dataCenter = 
endpointsnitch.getDatacenter(message.getFrom());
             // If this DC needs to be blocked then do the below.
-            if (blockFor != null)
-            {
-                Integer quorumCount = dcResponses.get(dataCenter);
-                if (quorumCount == null)
-                {
-                    // Intialize and recognize the first response
-                    dcResponses.put(dataCenter, 1);
-                }
-                else if ((Integer) blockFor > quorumCount)
-                {
-                    // recognize the consequtive responses.
-                    dcResponses.put(dataCenter, quorumCount + 1);
-                }
-                else
-                {
-                    // No need to wait on it anymore so remove it.
-                    responseCounts.remove(dataCenter);
-                }
-            }
+            dcResponses.get(dataCenter).getAndDecrement();
         }
         catch (UnknownHostException e)
         {
             throw new RuntimeException(e);
         }
-        responses.add(message);
-        // If done then the response count will be empty
-        if (responseCounts.isEmpty())
-        {
-            condition.signal();
+        maybeSignal();
+    }
+    
+    private void maybeSignal()
+    {
+       for(AtomicInteger i : dcResponses.values()) {
+               if (0 < i.get()) {
+                       return;
+               }
+       }
+       // If all the quorum conditionas are met then return back.
+       condition.signal();
+    }
+    
+    @Override
+    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+    {        
+        this.dcResponses = new HashMap<String, AtomicInteger>();
+        for (String dc: stategy.getDatacenters(table)) {
+               int rf = stategy.getReplicationFactor(dc, table);
+               dcResponses.put(dc, new AtomicInteger((rf/2) + 1));
+        }
+       // Do nothing, there is no 'one' integer to block for
+        return 0;
+    }
+    
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> 
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints) throws 
UnavailableException
+    {   
+               Map<String, AtomicInteger> dcEndpoints = new HashMap<String, 
AtomicInteger>();
+               try
+               {
+                       for (String dc: stategy.getDatacenters(table))
+                               dcEndpoints.put(dc, new AtomicInteger());
+                       for (InetAddress destination : hintedEndpoints.keySet())
+                       {
+                               // If not just go to the next endpoint
+                               if (!writeEndpoints.contains(destination))
+                                       continue;
+                               // figure out the destination dc
+                               String destinationDC = 
endpointsnitch.getDatacenter(destination);
+                               
dcEndpoints.get(destinationDC).incrementAndGet();
+                       }
+               }
+               catch (UnknownHostException e)
+               {
+                       throw new UnavailableException();
+               }
+        // Throw exception if any of the DC doesnt have livenodes to accept 
write.
+        for (String dc: stategy.getDatacenters(table)) {
+               if (dcEndpoints.get(dc).get() != dcResponses.get(dc).get()) {
+                throw new UnavailableException();
+               }
         }
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
 Mon May 10 18:52:38 2010
@@ -26,13 +26,19 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.locator.RackInferringSnitch;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.collect.Multimap;
+
 /**
  * This class will basically will block for the replication factor which is
  * provided in the input map. it will block till we recive response from (DC, 
n)
@@ -40,27 +46,26 @@ import org.apache.cassandra.utils.FBUtil
  */
 public class DatacenterWriteResponseHandler extends WriteResponseHandler
 {
-    private final AtomicInteger blockFor;
-    private final AbstractRackAwareSnitch endpointsnitch;
-    private final InetAddress localEndpoint;
+    private static final RackInferringSnitch snitch = (RackInferringSnitch) 
DatabaseDescriptor.getEndpointSnitch();
+       private static final String localdc = 
snitch.getDatacenter(FBUtilities.getLocalAddress());
+       private final AtomicInteger blockFor;
 
-    public DatacenterWriteResponseHandler(int blockFor, String table)
+    public DatacenterWriteResponseHandler(Collection<InetAddress> 
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, 
ConsistencyLevel consistencyLevel, String table) throws UnavailableException
     {
         // Response is been managed by the map so the waitlist size really 
doesnt matter.
-        super(blockFor, table);
-        this.blockFor = new AtomicInteger(blockFor);
-        endpointsnitch = (AbstractRackAwareSnitch) 
DatabaseDescriptor.getEndpointSnitch();
-        localEndpoint = FBUtilities.getLocalAddress();
+        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        blockFor = new AtomicInteger(responseCount);
     }
 
     @Override
     public void response(Message message)
     {
+        responses.add(message);
         //Is optimal to check if same datacenter than comparing Arrays.
         int b = -1;
         try
         {
-            if 
(endpointsnitch.getDatacenter(localEndpoint).equals(endpointsnitch.getDatacenter(message.getFrom())))
+            if 
(localdc.equals(endpointsnitch.getDatacenter(message.getFrom())))
             {
                 b = blockFor.decrementAndGet();
             }
@@ -69,7 +74,6 @@ public class DatacenterWriteResponseHand
         {
             throw new RuntimeException(e);
         }
-        responses.add(message);
         if (b == 0)
         {
             //Singnal when Quorum is recived.
@@ -78,4 +82,35 @@ public class DatacenterWriteResponseHand
         if (logger.isDebugEnabled())
             logger.debug("Processed Message: " + message.toString());
     }
-}
+    
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> 
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
+    throws UnavailableException
+    {
+        int liveNodes = 0;
+        try {
+            // count destinations that are part of the desired target set
+            for (InetAddress destination : hintedEndpoints.keySet())
+            {
+                if (writeEndpoints.contains(destination) && 
localdc.equals(endpointsnitch.getDatacenter(destination)))
+                    liveNodes++;
+            }
+        } catch (Exception ex) {
+            throw new UnavailableException();
+        }
+        if (liveNodes < responseCount)
+        {
+            throw new UnavailableException();
+        }
+    }
+    
+    @Override
+    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+    {
+       DatacenterShardStrategy stategy = (DatacenterShardStrategy) 
StorageService.instance.getReplicationStrategy(table);
+        if (consistencyLevel.equals(ConsistencyLevel.DCQUORUM)) {
+            return (stategy.getReplicationFactor(localdc, table)/2) + 1;
+        }
+        return super.determineBlockFor(writeEndpoints);
+    }
+}
\ No newline at end of file

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Mon May 10 18:52:38 2010
@@ -19,9 +19,6 @@
 package org.apache.cassandra.service;
 
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -31,6 +28,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
@@ -40,15 +38,19 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses;
-    private IResponseResolver<T> responseResolver;
+    protected final Collection<Message> responses = new 
LinkedBlockingQueue<Message>();;
+    protected IResponseResolver<T> responseResolver;
     private final long startTime;
-
-    public QuorumResponseHandler(int responseCount, IResponseResolver<T> 
responseResolver)
+    protected int blockfor;
+    
+    /**
+     * Constructor when response count has to be calculated and blocked for.
+     */
+    public QuorumResponseHandler(IResponseResolver<T> responseResolver, 
ConsistencyLevel consistencyLevel, String table)
     {
-        responses = new LinkedBlockingQueue<Message>();
+        this.blockfor = determineBlockFor(consistencyLevel, table);
         this.responseResolver = responseResolver;
-        startTime = System.currentTimeMillis();
+        this.startTime = System.currentTimeMillis();
     }
     
     public T get() throws TimeoutException, DigestMismatchException, 
IOException
@@ -90,9 +92,28 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
+        if (responses.size() < blockfor) {
+            return;
+        }
         if (responseResolver.isDataPresent(responses))
         {
             condition.signal();
         }
     }
+    
+    public int determineBlockFor(ConsistencyLevel consistencyLevel, String 
table)
+    {
+        switch (consistencyLevel)
+        {
+            case ONE:
+            case ANY:
+                return 1;
+            case QUORUM:
+                return (DatabaseDescriptor.getQuorum(table)/ 2) + 1;
+            case ALL:
+                return DatabaseDescriptor.getReplicationFactor(table);
+            default:
+                throw new UnsupportedOperationException("invalid consistency 
level: " + table.toString());
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java 
Mon May 10 18:52:38 2010
@@ -30,7 +30,6 @@ import org.apache.cassandra.db.*;
 import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,17 +42,12 @@ public class ReadResponseResolver implem
 {
        private static Logger logger_ = 
LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
-    private final int responseCount;
 
-    public ReadResponseResolver(String table, int responseCount)
+    public ReadResponseResolver(String table)
     {
-        assert 1 <= responseCount && responseCount <= 
DatabaseDescriptor.getReplicationFactor(table)
-            : "invalid response count " + responseCount;
-
-        this.responseCount = responseCount;
         this.table = table;
     }
-
+    
     /*
       * This method for resolving read data should look at the timestamps of 
each
       * of the columns that are read and should pick up columns with the latest
@@ -159,9 +153,6 @@ public class ReadResponseResolver implem
 
        public boolean isDataPresent(Collection<Message> responses)
        {
-        if (responses.size() < responseCount)
-            return false;
-
         boolean isDataPresent = false;
         for (Message response : responses)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon 
May 10 18:52:38 2010
@@ -213,13 +213,9 @@ public class StorageProxy implements Sto
                 List<InetAddress> naturalEndpoints = 
ss.getNaturalEndpoints(table, rm.key());
                 Collection<InetAddress> writeEndpoints = 
rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, 
naturalEndpoints);
                 Multimap<InetAddress, InetAddress> hintedEndpoints = 
rs.getHintedEndpoints(writeEndpoints);
-                int blockFor = determineBlockFor(writeEndpoints.size(), 
consistency_level);
-
-                // avoid starting a write we know can't achieve the required 
consistency
-                assureSufficientLiveNodes(blockFor, writeEndpoints, 
hintedEndpoints, consistency_level);
                 
                 // send out the writes, as in mutate() above, but this time 
with a callback that tracks responses
-                final WriteResponseHandler responseHandler = 
ss.getWriteResponseHandler(blockFor, consistency_level, table);
+                final WriteResponseHandler responseHandler = 
rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, 
table);
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : 
hintedEndpoints.asMap().entrySet())
@@ -287,29 +283,6 @@ public class StorageProxy implements Sto
 
     }
 
-    private static void assureSufficientLiveNodes(int blockFor, 
Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> 
hintedEndpoints, ConsistencyLevel consistencyLevel)
-            throws UnavailableException
-    {
-        if (consistencyLevel == ConsistencyLevel.ANY)
-        {
-            // ensure there are blockFor distinct living nodes (hints are ok).
-            if (hintedEndpoints.keySet().size() < blockFor)
-                throw new UnavailableException();
-        }
-
-        // count destinations that are part of the desired target set
-        int liveNodes = 0;
-        for (InetAddress destination : hintedEndpoints.keySet())
-        {
-            if (writeEndpoints.contains(destination))
-                liveNodes++;
-        }
-        if (liveNodes < blockFor)
-        {
-            throw new UnavailableException();
-        }
-    }
-
     private static void insertLocalMessage(final RowMutation rm, final 
WriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
@@ -325,26 +298,6 @@ public class StorageProxy implements Sto
         StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
     }
 
-    private static int determineBlockFor(int expandedTargets, ConsistencyLevel 
consistency_level)
-    {
-        switch (consistency_level)
-        {
-            case ONE:
-            case ANY:
-                return 1;
-            case QUORUM:
-                return (expandedTargets / 2) + 1;
-            case DCQUORUM:
-            case DCQUORUMSYNC:
-                // TODO this is broken
-                return expandedTargets;
-            case ALL:
-                return expandedTargets;
-            default:
-                throw new UnsupportedOperationException("invalid consistency 
level " + consistency_level);
-        }
-    }
-
     /**
      * Read the data from one replica.  When we get
      * the data we perform consistency checks and figure out if any repairs 
need to be done to the replicas.
@@ -461,10 +414,6 @@ public class StorageProxy implements Sto
 
             InetAddress dataPoint = 
StorageService.instance.findSuitableEndpoint(command.table, command.key);
             List<InetAddress> endpointList = 
StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-            final String table = command.table;
-            int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), 
consistency_level);
-            if (endpointList.size() < responseCount)
-                throw new UnavailableException();
 
             InetAddress[] endpoints = new InetAddress[endpointList.size()];
             Message messages[] = new Message[endpointList.size()];
@@ -479,7 +428,9 @@ public class StorageProxy implements Sto
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? 
"data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + 
endpoint);
             }
-            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new 
ReadResponseResolver(command.table, responseCount));
+            AbstractReplicationStrategy rs = 
StorageService.instance.getReplicationStrategy(command.table);
+            QuorumResponseHandler<Row> quorumResponseHandler = 
rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
+                                                                               
                consistency_level, command.table);
             MessagingService.instance.sendRR(messages, endpoints, 
quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndpoints.add(endpoints);
@@ -503,10 +454,10 @@ public class StorageProxy implements Sto
             {
                 if (randomlyReadRepair(command))
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver(command.table, 
DatabaseDescriptor.getQuorum(command.table));
-                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
new QuorumResponseHandler<Row>(
-                            DatabaseDescriptor.getQuorum(command.table),
-                            readResponseResolverRepair);
+                    IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver(command.table);
+                    AbstractReplicationStrategy rs = 
StorageService.instance.getReplicationStrategy(command.table);
+                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
rs.getQuorumResponseHandler(readResponseResolverRepair, 
ConsistencyLevel.QUORUM, 
+                                                                               
                          command.table);
                     logger.info("DigestMismatchException: " + ex.getMessage());
                     Message messageRepair = command.makeReadMessage();
                     MessagingService.instance.sendRR(messageRepair, 
commandEndpoints.get(commandIndex), quorumResponseHandlerRepair);
@@ -566,9 +517,7 @@ public class StorageProxy implements Sto
         long startTime = System.nanoTime();
 
         final String table = command.keyspace;
-        int responseCount = 
determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), 
consistency_level);
-
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = 
getRestrictedRanges(command.range, command.keyspace, responseCount);
+        List<Pair<AbstractBounds, List<InetAddress>>> ranges = 
getRestrictedRanges(command.range, command.keyspace);
 
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
@@ -581,8 +530,8 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new 
RangeSliceResponseResolver(command.keyspace, endpoints);
-            QuorumResponseHandler<List<Row>> handler = new 
QuorumResponseHandler<List<Row>>(responseCount, resolver);
-
+            AbstractReplicationStrategy rs = 
StorageService.instance.getReplicationStrategy(table);
+            QuorumResponseHandler<List<Row>> handler = 
rs.getQuorumResponseHandler(resolver, consistency_level, table);
             for (InetAddress endpoint : endpoints)
             {
                 MessagingService.instance.sendRR(message, endpoint, handler);
@@ -678,7 +627,7 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, 
T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<Pair<AbstractBounds, List<InetAddress>>> 
getRestrictedRanges(AbstractBounds queryRange, String keyspace, int 
responseCount)
+    private static List<Pair<AbstractBounds, List<InetAddress>>> 
getRestrictedRanges(AbstractBounds queryRange, String keyspace)
     throws UnavailableException
     {
         TokenMetadata tokenMetadata = 
StorageService.instance.getTokenMetadata();
@@ -689,11 +638,8 @@ public class StorageProxy implements Sto
             Token nodeToken = iter.next();
             Range nodeRange = new 
Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
             List<InetAddress> endpoints = 
StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
-            if (endpoints.size() < responseCount)
-                throw new UnavailableException();
 
             
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
 endpoints);
-            List<InetAddress> endpointsForCL = endpoints.subList(0, 
responseCount);
             Set<AbstractBounds> restrictedRanges = 
queryRange.restrictTo(nodeRange);
             for (AbstractBounds range : restrictedRanges)
             {
@@ -701,7 +647,7 @@ public class StorageProxy implements Sto
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Adding to restricted ranges " + 
unwrapped + " for " + nodeRange);
-                    ranges.add(new Pair<AbstractBounds, 
List<InetAddress>>(unwrapped, endpointsForCL));
+                    ranges.add(new Pair<AbstractBounds, 
List<InetAddress>>(unwrapped, endpoints));
                 }
             }
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Mon May 10 18:52:38 2010
@@ -50,7 +50,6 @@ import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -1521,11 +1520,6 @@ public class StorageService implements I
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new 
ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + 
token.toString()));
     }
 
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, 
ConsistencyLevel consistency_level, String table)
-    {
-        return getReplicationStrategy(table).getWriteResponseHandler(blockFor, 
consistency_level, table);
-    }
-
     public boolean isClientMode()
     {
         return isClientMode;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
Mon May 10 18:52:38 2010
@@ -19,41 +19,45 @@
 package org.apache.cassandra.service;
 
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.io.IOException;
+import java.net.InetAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Multimap;
+
 public class WriteResponseHandler implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( 
WriteResponseHandler.class );
+    protected static final AbstractRackAwareSnitch endpointsnitch = 
(AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
     protected final SimpleCondition condition = new SimpleCondition();
-    private final int responseCount;
+    protected final int responseCount;
     protected final Collection<Message> responses;
     protected AtomicInteger localResponses = new AtomicInteger(0);
-    private final long startTime;
+    protected final long startTime;
+       protected final ConsistencyLevel consistencyLevel;
+       protected final String table;
 
-    public WriteResponseHandler(int responseCount, String table)
+    public WriteResponseHandler(Collection<InetAddress> writeEndpoints, 
Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel 
consistencyLevel, String table) 
+    throws UnavailableException
     {
-        // at most one node per range can bootstrap at a time, and these will 
be added to the write until
-        // bootstrap finishes (at which point we no longer need to write to 
the old ones).
-        assert 1 <= responseCount && responseCount <= 2 * 
DatabaseDescriptor.getReplicationFactor(table)
-            : "invalid response count " + responseCount;
-
-        this.responseCount = responseCount;
+       this.table = table;
+       this.consistencyLevel = consistencyLevel;
+        this.responseCount = determineBlockFor(writeEndpoints);
+        assureSufficientLiveNodes(writeEndpoints, hintedEndpoints);
         responses = new LinkedBlockingQueue<Message>();
         startTime = System.currentTimeMillis();
     }
@@ -106,4 +110,54 @@ public class WriteResponseHandler implem
             condition.signal();
         }
     }
-}
+    
+    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
+    {
+        int blockFor = 0;
+        switch (consistencyLevel)
+        {
+            case ONE:
+                blockFor = 1;
+                break;
+            case ANY:
+                blockFor = 1;
+                break;
+            case QUORUM:
+                blockFor = (writeEndpoints.size() / 2) + 1;
+                break;
+            case ALL:
+                blockFor = writeEndpoints.size();
+                break;
+            default:
+                throw new UnsupportedOperationException("invalid consistency 
level: " + consistencyLevel.toString());
+        }
+        // at most one node per range can bootstrap at a time, and these will 
be added to the write until
+        // bootstrap finishes (at which point we no longer need to write to 
the old ones).
+        assert 1 <= blockFor && blockFor <= 2 * 
DatabaseDescriptor.getReplicationFactor(table)
+            : "invalid response count " + responseCount;
+        return blockFor;
+    }
+    
+    public void assureSufficientLiveNodes(Collection<InetAddress> 
writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
+    throws UnavailableException
+    {
+        if (consistencyLevel == ConsistencyLevel.ANY)
+        {
+            // ensure there are blockFor distinct living nodes (hints are ok).
+            if (hintedEndpoints.keySet().size() < responseCount)
+                throw new UnavailableException();
+        }
+        
+        // count destinations that are part of the desired target set
+        int liveNodes = 0;
+        for (InetAddress destination : hintedEndpoints.keySet())
+        {
+            if (writeEndpoints.contains(destination))
+                liveNodes++;
+        }
+        if (liveNodes < responseCount)
+        {
+            throw new UnavailableException();
+        }
+    }
+}
\ No newline at end of file

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java?rev=942840&r1=942839&r2=942840&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
 Mon May 10 18:52:38 2010
@@ -1,18 +1,31 @@
 package org.apache.cassandra.locator;
 
-import org.junit.Test;
+import java.io.IOException;
+import java.net.InetAddress;
+
+import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.cassandra.config.ConfigurationException;
+import org.junit.Test;
+import org.xml.sax.SAXException;
 
 public class DatacenterStrategyTest
 {
-    @Test
-    public void testProperties() throws ConfigurationException
+    private String table = "Keyspace1";
+
+       @Test
+    public void testProperties() throws IOException, 
ParserConfigurationException, SAXException
     {
-        DatacenterShardStrategy strategy = new DatacenterShardStrategy(new 
TokenMetadata(), new RackInferringSnitch());
-        assert strategy.getReplicationFactor("dc1") == 3;
-        assert strategy.getReplicationFactor("dc2") == 5;
-        assert strategy.getReplicationFactor("dc3") == 1;
+       XMLFileSnitch snitch = new XMLFileSnitch();
+       TokenMetadata metadata = new TokenMetadata();
+       InetAddress localhost = InetAddress.getLocalHost();
+       // Set the localhost to the tokenmetadata. Embeded cassandra way?
+       // metadata.addBootstrapToken();
+        DatacenterShardStrategy strategy = new DatacenterShardStrategy(new 
TokenMetadata(), snitch);
+        assert strategy.getReplicationFactor("dc1", table) == 3;
+        assert strategy.getReplicationFactor("dc2", table) == 5;
+        assert strategy.getReplicationFactor("dc3", table) == 1;
+        // Query for the natural hosts
+        // strategy.getNaturalEndpoints(token, table)
     }
 
 }


Reply via email to