Author: jbellis
Date: Mon Oct 26 15:00:20 2009
New Revision: 829819

URL: http://svn.apache.org/viewvc?rev=829819&view=rev
Log:
add dcquorum/dcquorumsync consistency levels, representing "a quorum of the 
replicas in the current dc" and "a quorum of the replicas in each dc," 
respectively.
patch by Vijay Parthasarathy; reviewed by jbellis for CASSANDRA-492

Added:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Modified:
    incubator/cassandra/trunk/interface/cassandra.thrift
    
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Mon Oct 26 15:00:20 
2009
@@ -92,7 +92,9 @@
     ZERO = 0,
     ONE = 1,
     QUORUM = 2,
-    ALL = 3,
+    DCQUORUM = 3,
+    DCQUORUMSYNC = 4,
+    ALL = 5,
 }
 
 struct ColumnParent {

Modified: 
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
 (original)
+++ 
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
 Mon Oct 26 15:00:20 2009
@@ -38,18 +38,24 @@
   public static final int ZERO = 0;
   public static final int ONE = 1;
   public static final int QUORUM = 2;
-  public static final int ALL = 3;
+  public static final int DCQUORUM = 3;
+  public static final int DCQUORUMSYNC = 4;
+  public static final int ALL = 5;
 
   public static final IntRangeSet VALID_VALUES = new IntRangeSet(
     ZERO, 
     ONE, 
     QUORUM, 
+    DCQUORUM, 
+    DCQUORUMSYNC, 
     ALL );
 
   public static final Map<Integer, String> VALUES_TO_NAMES = new 
HashMap<Integer, String>() {{
     put(ZERO, "ZERO");
     put(ONE, "ONE");
     put(QUORUM, "QUORUM");
+    put(DCQUORUM, "DCQUORUM");
+    put(DCQUORUMSYNC, "DCQUORUMSYNC");
     put(ALL, "ALL");
   }};
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
 Mon Oct 26 15:00:20 2009
@@ -18,17 +18,20 @@
 */
 package org.apache.cassandra.locator;
 
+import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
-import java.net.InetAddress;
-
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 /**
  * This class contains a helper method that will be used by
@@ -53,6 +56,11 @@
     }
 
     public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token, 
Map<Token, InetAddress> tokenToEndPointMap);
+    
+    public <T> QuorumResponseHandler<T> 
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int 
consistency_level) throws InvalidRequestException
+    {
+        return new QuorumResponseHandler<T>(blockFor, responseResolver);
+    }
 
     public ArrayList<InetAddress> getNaturalEndpoints(Token token)
     {

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java?rev=829819&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
 Mon Oct 26 15:00:20 2009
@@ -0,0 +1,189 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.utils.XMLUtils;
+import org.xml.sax.SAXException;
+
+/**
+ * DataCenterEndPointSnitch
+ * <p/>
+ * This class basically reads the configuration and sets the IP Ranges to a
+ * hashMap which can be read later. this class also provides a way to compare 2
+ * EndPoints and also get details from the same.
+ */
+
+public class DatacenterEndPointSnitch implements IEndPointSnitch
+{
+    /**
+     * This Map will contain the information of the EndPoints and its Location
+     * (Datacenter and RAC)
+     */
+    private Map<Byte, Map<Byte, String>> ipDC = new HashMap<Byte, Map<Byte, 
String>>();
+    private Map<Byte, Map<Byte, String>> ipRAC = new HashMap<Byte, Map<Byte, 
String>>();
+    private Map<String, Integer> dcRepFactor = new HashMap<String, Integer>();
+
+    private XMLUtils xmlUtils;
+    private Map<String, Integer> quorumDCMap = new HashMap<String, Integer>();
+    /**
+     * The default rack property file to be read.
+     */
+    private static String DEFAULT_RACK_CONFIG_FILE = 
"/etc/cassandra/DC-Config.xml";
+
+    /**
+     * Reference to the logger.
+     */
+    private static Logger logger_ = Logger
+            .getLogger(DatacenterEndPointSnitch.class);
+
+    /**
+     * Constructor, intialize XML config and read the config in...
+     */
+    public DatacenterEndPointSnitch() throws IOException,
+                                             ParserConfigurationException, 
SAXException
+    {
+        super();
+        xmlUtils = new XMLUtils(DEFAULT_RACK_CONFIG_FILE);
+        reloadConfiguration();
+    }
+
+    /**
+     * Return the rack for which an endpoint resides in
+     */
+    public String getRackForEndPoint(InetAddress endPoint)
+            throws UnknownHostException
+    {
+        byte[] ipQuads = getIPAddress(endPoint.getHostAddress());
+        return ipRAC.get(ipQuads[1]).get(ipQuads[2]);
+    }
+
+    /**
+     * This method will load the configuration from the xml file. Mandatory
+     * fields are Atleast 1 DC and 1RAC configurations. Name of the DC/RAC, IP
+     * Quadrents for RAC and DC.
+     * <p/>
+     * This method will not be called everytime
+     */
+    public void reloadConfiguration() throws IOException
+    {
+        try
+        {
+            String[] dcNames = 
xmlUtils.getNodeValues("/EndPoints/DataCenter/name");
+            for (String dcName : dcNames)
+            {
+                // Parse the Datacenter Quaderant.
+                String dcXPath = "/EndPoints/DataCenter[name='" + dcName + 
"']";
+                String dcIPQuad = xmlUtils.getNodeValue(dcXPath + 
"/ip2ndQuad");
+                String replicationFactor = xmlUtils.getNodeValue(dcXPath + 
"/replicationFactor");
+                byte dcByte = intToByte(Integer.parseInt(dcIPQuad));
+                // Parse the replication factor for a DC
+                int dcReF = Integer.parseInt(replicationFactor);
+                dcRepFactor.put(dcName, dcReF);
+                quorumDCMap.put(dcName, (dcReF / 2 + 1));
+                String[] racNames = xmlUtils.getNodeValues(dcXPath + 
"/rack/name");
+                Map<Byte, String> dcRackMap = ipDC.get(dcByte);
+                if (null == dcRackMap)
+                {
+                    dcRackMap = new HashMap<Byte, String>();
+                }
+                Map<Byte, String> rackDcMap = ipRAC.get(dcByte);
+                if (null == rackDcMap)
+                {
+                    rackDcMap = new HashMap<Byte, String>();
+                }
+                for (String racName : racNames)
+                {
+                    // Parse the RAC ip Quaderant.
+                    String racIPQuad = xmlUtils.getNodeValue(dcXPath + 
"/rack[name = '" + racName + "']/ip3rdQuad");
+                    byte racByte = intToByte(Integer.parseInt(racIPQuad));
+                    dcRackMap.put(racByte, dcName);
+                    rackDcMap.put(racByte, racName);
+                }
+                ipDC.put(dcByte, dcRackMap);
+                ipRAC.put(dcByte, rackDcMap);
+            }
+        }
+        catch (Exception ioe)
+        {
+            throw new IOException("Could not process " + 
DEFAULT_RACK_CONFIG_FILE, ioe);
+        }
+    }
+
+    /**
+     * This methood will return ture if the hosts are in the same RAC else
+     * false.
+     */
+    public boolean isOnSameRack(InetAddress host, InetAddress host2)
+            throws UnknownHostException
+    {
+        /*
+        * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+        * octet. If they are the same then the hosts are in the same rack else
+        * different racks.
+        */
+        byte[] ip = getIPAddress(host.getHostAddress());
+        byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+        return ipRAC.get(ip[1]).get(ip[2])
+                .equals(ipRAC.get(ip2[1]).get(ip2[2]));
+    }
+
+    /**
+     * This methood will return ture if the hosts are in the same DC else 
false.
+     */
+    public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
+            throws UnknownHostException
+    {
+        /*
+        * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+        * octet and get the DC Name. If they are the same then the hosts are in
+        * the same datacenter else different datacenter.
+        */
+        byte[] ip = getIPAddress(host.getHostAddress());
+        byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+        return ipDC.get(ip[1]).get(ip[2]).equals(ipDC.get(ip2[1]).get(ip2[2]));
+    }
+
+    /**
+     * Returns a DC replication map, the key will be the dc name and the value
+     * will be the replication factor of that Datacenter.
+     */
+    public HashMap<String, Integer> getMapReplicationFactor()
+    {
+        return new HashMap<String, Integer>(dcRepFactor);
+    }
+
+    /**
+     * Returns a DC replication map, the key will be the dc name and the value
+     * will be the replication factor of that Datacenter.
+     */
+    public HashMap<String, Integer> getMapQuorumFactor()
+    {
+        return new HashMap<String, Integer>(quorumDCMap);
+    }
+
+    private byte[] getIPAddress(String host) throws UnknownHostException
+    {
+        InetAddress ia = InetAddress.getByName(host);
+        return ia.getAddress();
+    }
+
+    public static byte intToByte(int n)
+    {
+        return (byte) (n & 0x000000ff);
+    }
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException
+    {
+        byte[] ipQuads = getIPAddress(endpoint.getHostAddress());
+        return ipDC.get(ipQuads[1]).get(ipQuads[2]);
+    }
+}

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=829819&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
 Mon Oct 26 15:00:20 2009
@@ -0,0 +1,216 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.io.IOError;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.*;
+
+/**
+ * This Stategy is little diffrent than the Rack aware Statergy. If there is
+ * replication factor is N. We will make sure that (N-1)%2 of the nodes are in
+ * other Datacenter.... For example if we have 5 nodes this stategy will make
+ * sure to make 2 copies out of 5 in other dataceneter.
+ * <p/>
+ * This class also caches the EndPoints and invalidates the cache if there is a
+ * change in the number of tokens.
+ */
+public class DatacenterShardStategy extends AbstractReplicationStrategy
+{
+    private static Map<String, List<Token>> dcMap = new HashMap<String, 
List<Token>>();
+    private static Map<String, Integer> dcReplicationFactor = new 
HashMap<String, Integer>();
+    private static Map<String, Integer> quorumRepFactor = new HashMap<String, 
Integer>();
+    private static int locQFactor = 0;
+    private static DatacenterEndPointSnitch endPointSnitch;
+    ArrayList<Token> tokens;
+
+    private List<InetAddress> localEndPoints = new ArrayList<InetAddress>();
+
+    private List<InetAddress> getLocalEndPoints()
+    {
+        return new ArrayList<InetAddress>(localEndPoints);
+    }
+
+    private Map<String, Integer> getQuorumRepFactor()
+    {
+        return new HashMap<String, Integer>(quorumRepFactor);
+    }
+
+    /**
+     * This Method will get the required information of the EndPoint from the
+     * DataCenterEndPointSnitch and poopulates this singleton class.
+     *
+     * @param tokenToEndPointMap - Provided the endpoint map which will be 
mapped with the DC's
+     */
+    private void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap, 
Collection<Token> tokens) throws IOException
+    {
+        endPointSnitch = (DatacenterEndPointSnitch) 
StorageService.instance().getEndPointSnitch();
+        this.tokens = new ArrayList<Token>(tokens);
+        String localDC = 
endPointSnitch.getLocation(InetAddress.getLocalHost());
+        for (Token token : this.tokens)
+        {
+            InetAddress endPoint = tokenToEndPointMap.get(token);
+            String dataCenter = endPointSnitch.getLocation(endPoint);
+            if (dataCenter.equals(localDC))
+            {
+                localEndPoints.add(endPoint);
+            }
+            List<Token> lst = dcMap.get(dataCenter);
+            if (lst == null)
+            {
+                lst = new ArrayList<Token>();
+            }
+            lst.add(token);
+            dcMap.put(dataCenter, lst);
+        }
+        for (Entry<String, List<Token>> entry : dcMap.entrySet())
+        {
+            List<Token> valueList = entry.getValue();
+            Collections.sort(valueList);
+            dcMap.put(entry.getKey(), valueList);
+        }
+        dcReplicationFactor = endPointSnitch.getMapReplicationFactor();
+        for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
+        {
+            String datacenter = entry.getKey();
+            int qFactor = (entry.getValue() / 2 + 1);
+            quorumRepFactor.put(datacenter, qFactor);
+            if (datacenter.equals(localDC))
+            {
+                locQFactor = qFactor;
+            }
+        }
+    }
+
+    public DatacenterShardStategy(TokenMetadata tokenMetadata, 
IPartitioner<Token> partitioner, int replicas, int storagePort)
+    throws UnknownHostException
+    {
+        super(tokenMetadata, partitioner, replicas, storagePort);
+        assert (DatabaseDescriptor.getEndPointSnitch() instanceof 
DatacenterEndPointSnitch);
+    }
+
+    @Override
+    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken, 
Map<Token, InetAddress> tokenToEndPointMap)
+    {
+        try
+        {
+            return getNaturalEndpointsInternal(searchToken, 
tokenToEndPointMap);
+        }
+        catch (IOException e)
+        {
+             throw new IOError(e);
+        }
+    }
+
+    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token 
searchToken, Map<Token, InetAddress> tokenToEndPointMap) throws IOException
+    {
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+        if (null == tokens || this.tokens.size() != 
tokenToEndPointMap.keySet().size())
+        {
+            loadEndPoints(tokenToEndPointMap, tokenToEndPointMap.keySet());
+        }
+
+        for (String dc : dcMap.keySet())
+        {
+            int foundCount = 0;
+            ArrayList<InetAddress> forloopReturn = new 
ArrayList<InetAddress>();
+            int replicas_ = dcReplicationFactor.get(dc);
+            List tokens = dcMap.get(dc);
+            boolean bOtherRack = false;
+            boolean doneDataCenterItr;
+            int index = Collections.binarySearch(tokens, searchToken);
+            if (index < 0)
+            {
+                index = (index + 1) * (-1);
+                if (index >= tokens.size())
+                {
+                    index = 0;
+                }
+            }
+            int totalNodes = tokens.size();
+            // Add the node at the index by default
+            InetAddress primaryHost = 
tokenToEndPointMap.get(tokens.get(index));
+            forloopReturn.add(primaryHost);
+            foundCount++;
+            if (replicas_ == 1)
+            {
+                continue;
+            }
+
+            int startIndex = (index + 1) % totalNodes;
+            for (int i = startIndex, count = 1; count < totalNodes && 
foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+            {
+                InetAddress endPointOfIntrest = 
tokenToEndPointMap.get(tokens.get(i));
+                if ((replicas_ - 1) > foundCount)
+                {
+                    forloopReturn.add(endPointOfIntrest);
+                    continue;
+                }
+                else
+                {
+                    doneDataCenterItr = true;
+                }
+                // Now try to find one on a different rack
+                if (!bOtherRack)
+                {
+                    if (!endPointSnitch.isOnSameRack(primaryHost, 
endPointOfIntrest))
+                    {
+                        
forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+                        bOtherRack = true;
+                        foundCount++;
+                    }
+                }
+                // If both already found exit loop.
+                if (doneDataCenterItr && bOtherRack)
+                {
+                    break;
+                }
+            }
+
+            /*
+            * If we found N number of nodes we are good. This loop wil just
+            * exit. Otherwise just loop through the list and add until we
+            * have N nodes.
+            */
+            for (int i = startIndex, count = 1; count < totalNodes && 
foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+            {
+                if 
(!forloopReturn.contains(tokenToEndPointMap.get(tokens.get(i))))
+                {
+                    forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+                    foundCount++;
+                }
+            }
+            endpoints.addAll(forloopReturn);
+        }
+
+        return endpoints;
+    }
+
+    /**
+     * This method will generate the QRH object and returns. If the Consistency
+     * level is DCQUORUM then it will return a DCQRH with a map of local rep
+     * factor alone. If the consistency level is DCQUORUMSYNC then it will
+     * return a DCQRH with a map of all the DC rep facor.
+     */
+    @Override
+    public <T> QuorumResponseHandler<T> 
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int 
consistency_level)
+    throws InvalidRequestException
+    {
+        if (consistency_level == ConsistencyLevel.DCQUORUM)
+        {
+            List<InetAddress> endpoints = getLocalEndPoints();
+            return new DatacenterQuorumResponseHandler<T>(endpoints, 
locQFactor, responseResolver);
+        }
+        else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
+        {
+            return new 
DatacenterQuorumSyncResponseHandler<T>(getQuorumRepFactor(), responseResolver);
+        }
+        return super.getResponseHandler(responseResolver, blockFor, 
consistency_level);
+    }
+}
\ No newline at end of file

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
 Mon Oct 26 15:00:20 2009
@@ -18,9 +18,8 @@
 
 package org.apache.cassandra.locator;
 
-import java.net.*;
-
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 public class EndPointSnitch implements IEndPointSnitch
 {
@@ -30,13 +29,13 @@
          * Look at the IP Address of the two hosts. Compare 
          * the 3rd octet. If they are the same then the hosts
          * are in the same rack else different racks. 
-        */        
+        */
         byte[] ip = host.getAddress();
         byte[] ip2 = host2.getAddress();
-        
-        return ( ip[2] == ip2[2] );
+
+        return ip[2] == ip2[2];
     }
-    
+
     public boolean isInSameDataCenter(InetAddress host, InetAddress host2) 
throws UnknownHostException
     {
         /*
@@ -46,7 +45,12 @@
         */
         byte[] ip = host.getAddress();
         byte[] ip2 = host2.getAddress();
-        
-        return ( ip[1] == ip2[1] );
+
+        return ip[1] == ip2[1];
+    }
+
+    public String getLocation(InetAddress endpoint) throws UnknownHostException
+    {
+        throw new UnknownHostException("Not Supported");
     }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
 Mon Oct 26 15:00:20 2009
@@ -27,6 +27,8 @@
  * This interface helps determine location of node in the data center relative 
to another node.
  * Give a node A and another node B it can tell if A and B are on the same 
rack or in the same
  * data center.
+ *
+ * Not all methods will be germate to all implementations.  Throw 
UnsupportedOperation as necessary.
  */
 
 public interface IEndPointSnitch
@@ -48,4 +50,9 @@
      * @throws UnknownHostException
      */
     public boolean isInSameDataCenter(InetAddress host, InetAddress host2) 
throws UnknownHostException;
+    
+    /**
+     * Given endpoints this method will help us know the datacenter name where 
the node is located at.
+     */
+    public String getLocation(InetAddress endpoint) throws 
UnknownHostException;
 }

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=829819&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
 Mon Oct 26 15:00:20 2009
@@ -0,0 +1,50 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will basically will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from (DC, 
n)
+ * nodes.
+ */
+public class DatacenterQuorumResponseHandler<T> extends 
QuorumResponseHandler<T>
+{
+    private final List<InetAddress> waitList;
+    private int blockFor;
+
+    public DatacenterQuorumResponseHandler(List<InetAddress> waitList, int 
blockFor, IResponseResolver<T> responseResolver)
+    throws InvalidRequestException
+    {
+        // Response is been managed by the map so the waitlist size really 
doesnt matter.
+        super(blockFor, responseResolver);
+        this.blockFor = blockFor;
+        this.waitList = waitList;
+    }
+
+    @Override
+    public void response(Message message)
+    {
+        if (condition_.isSignaled())
+        {
+            return;
+        }
+
+        if (waitList.contains(message.getFrom()))
+        {
+            blockFor--;
+        }
+        responses_.add(message);
+        // If done then the response count will be empty after removing
+        // everything.
+        if (blockFor <= 0)
+        {
+            condition_.signal();
+        }
+    }
+}

Added: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java?rev=829819&view=auto
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
 (added)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
 Mon Oct 26 15:00:20 2009
@@ -0,0 +1,75 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from
+ * n nodes in each of our data centers.
+ */
+public class DatacenterQuorumSyncResponseHandler<T> extends 
QuorumResponseHandler<T>
+{
+    private final Map<String, Integer> dcResponses = new HashMap<String, 
Integer>();
+    private final Map<String, Integer> responseCounts;
+
+    public DatacenterQuorumSyncResponseHandler(Map<String, Integer> 
responseCounts, IResponseResolver<T> responseResolver)
+    throws InvalidRequestException
+    {
+        // Response is been managed by the map so make it 1 for the superclass.
+        super(1, responseResolver);
+        this.responseCounts = responseCounts;
+    }
+
+    @Override
+    public void response(Message message)
+    {
+        if (condition_.isSignaled())
+        {
+            return;
+        }
+        try
+        {
+            String dataCenter = 
DatabaseDescriptor.getEndPointSnitch().getLocation(message.getFrom());
+            Object blockFor = responseCounts.get(dataCenter);
+            // If this DC needs to be blocked then do the below.
+            if (blockFor != null)
+            {
+                Integer quorumCount = dcResponses.get(dataCenter);
+                if (quorumCount == null)
+                {
+                    // Intialize and recognize the first response
+                    dcResponses.put(dataCenter, 1);
+                }
+                else if ((Integer) blockFor > quorumCount)
+                {
+                    // recognize the consequtive responses.
+                    dcResponses.put(dataCenter, quorumCount + 1);
+                }
+                else
+                {
+                    // No need to wait on it anymore so remove it.
+                    responseCounts.remove(dataCenter);
+                }
+            }
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+        responses_.add(message);
+        // If done then the response count will be empty after removing
+        // everything.
+        if (responseCounts.isEmpty())
+        {
+            condition_.signal();
+        }
+    }
+}

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Mon Oct 26 15:00:20 2009
@@ -37,10 +37,10 @@
 
 public class QuorumResponseHandler<T> implements IAsyncCallback
 {
-    private static Logger logger_ = Logger.getLogger( 
QuorumResponseHandler.class );
-    private SimpleCondition condition_ = new SimpleCondition();
+    protected static Logger logger_ = Logger.getLogger( 
QuorumResponseHandler.class );
+    protected SimpleCondition condition_ = new SimpleCondition();
     private int responseCount_;
-    private List<Message> responses_ = new ArrayList<Message>();
+    protected List<Message> responses_ = new ArrayList<Message>();
     private IResponseResolver<T> responseResolver_;
     private long startTime_;
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Mon Oct 26 15:00:20 2009
@@ -72,27 +72,27 @@
      */
     private static Map<InetAddress, Message> createWriteMessages(RowMutation 
rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
     {
-               Map<InetAddress, Message> messageMap = new HashMap<InetAddress, 
Message>();
-               Message message = rm.makeRowMutationMessage();
+        Map<InetAddress, Message> messageMap = new HashMap<InetAddress, 
Message>();
+        Message message = rm.makeRowMutationMessage();
 
-               for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
-               {
+        for (Map.Entry<InetAddress, InetAddress> entry : 
endpointMap.entrySet())
+        {
             InetAddress target = entry.getKey();
             InetAddress hint = entry.getValue();
             if ( !target.equals(hint) )
-                       {
-                               Message hintedMessage = 
rm.makeRowMutationMessage();
-                               hintedMessage.addHeader(RowMutation.HINT, 
hint.getAddress());
-                               if (logger.isDebugEnabled())
-                                   logger.debug("Sending the hint of " + hint 
+ " to " + target);
-                               messageMap.put(target, hintedMessage);
-                       }
-                       else
-                       {
-                               messageMap.put(target, message);
-                       }
-               }
-               return messageMap;
+            {
+                Message hintedMessage = rm.makeRowMutationMessage();
+                hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
+                if (logger.isDebugEnabled())
+                    logger.debug("Sending the hint of " + hint + " to " + 
target);
+                messageMap.put(target, hintedMessage);
+            }
+            else
+            {
+                messageMap.put(target, message);
+            }
+        }
+        return messageMap;
     }
     
     /**
@@ -103,7 +103,7 @@
      * @param rm the mutation to be applied across the replicas
     */
     public static void insert(RowMutation rm)
-       {
+    {
         /*
          * Get the N nodes from storage service where the data needs to be
          * replicated
@@ -112,21 +112,21 @@
         */
 
         long startTime = System.currentTimeMillis();
-               try
-               {
+        try
+        {
             List<InetAddress> naturalEndpoints = 
StorageService.instance().getNaturalEndpoints(rm.key());
             // (This is the ZERO consistency level, so user doesn't care if we 
don't really have N destinations available.)
-                       Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
-                       Map<InetAddress, Message> messageMap = 
createWriteMessages(rm, endpointMap);
-                       for (Map.Entry<InetAddress, Message> entry : 
messageMap.entrySet())
-                       {
+            Map<InetAddress, InetAddress> endpointMap = 
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
+            Map<InetAddress, Message> messageMap = createWriteMessages(rm, 
endpointMap);
+            for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
+            {
                 Message message = entry.getValue();
                 InetAddress endpoint = entry.getKey();
                 if (logger.isDebugEnabled())
                     logger.debug("insert writing key " + rm.key() + " to " + 
message.getMessageId() + "@" + endpoint);
                 MessagingService.instance().sendOneWay(message, endpoint);
-                       }
-               }
+            }
+        }
         catch (IOException e)
         {
             throw new RuntimeException("error inserting key " + rm.key(), e);
@@ -159,7 +159,7 @@
             {
                 throw new UnavailableException();
             }
-            QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
+            QuorumResponseHandler<Boolean> quorumResponseHandler = 
StorageService.instance().getResponseHandler(new WriteResponseResolver(), 
blockFor, consistency_level);
             if (logger.isDebugEnabled())
                 logger.debug("insertBlocking writing key " + rm.key() + " to " 
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") 
+ "]");
 
@@ -204,6 +204,7 @@
 
     private static int determineBlockFor(int naturalTargets, int 
hintedTargets, int consistency_level)
     {
+        // TODO this is broken for DC quorum / DC quorum sync
         int bootstrapTargets = hintedTargets - naturalTargets;
         int blockFor;
         if (consistency_level == ConsistencyLevel.ONE)
@@ -318,8 +319,8 @@
         }
         else
         {
-            assert consistency_level == ConsistencyLevel.QUORUM;
-            rows = strongRead(commands);
+            assert consistency_level >= ConsistencyLevel.QUORUM;
+            rows = strongRead(commands, consistency_level);
         }
 
         readStats.add(System.currentTimeMillis() - startTime);
@@ -339,7 +340,7 @@
          * 7. else carry out read repair by getting data from all the nodes.
         // 5. return success
      */
-    private static List<Row> strongRead(List<ReadCommand> commands) throws 
IOException, TimeoutException, InvalidRequestException, UnavailableException
+    private static List<Row> strongRead(List<ReadCommand> commands, int 
consistency_level) throws IOException, TimeoutException, 
InvalidRequestException, UnavailableException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new 
ArrayList<QuorumResponseHandler<Row>>();
         List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
@@ -357,7 +358,7 @@
             Message message = command.makeReadMessage();
             Message messageDigestOnly = 
readMessageDigestOnly.makeReadMessage();
 
-            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver());
+            QuorumResponseHandler<Row> quorumResponseHandler = 
StorageService.instance().getResponseHandler(new ReadResponseResolver(), 
DatabaseDescriptor.getQuorum(), consistency_level);
             InetAddress dataPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
             List<InetAddress> endpointList = 
StorageService.instance().getNaturalEndpoints(command.key);
             /* Remove the local storage endpoint from the list. */

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Mon Oct 26 15:00:20 2009
@@ -289,7 +289,7 @@
 
     public IEndPointSnitch getEndPointSnitch()
     {
-       return endPointSnitch_;
+        return endPointSnitch_;
     }
     
     /*
@@ -577,10 +577,10 @@
      */
     public void takeSnapshot(String tableName, String tag) throws IOException
     {
-       if (DatabaseDescriptor.getTable(tableName) == null)
+        if (DatabaseDescriptor.getTable(tableName) == null)
         {
             throw new IOException("Table " + tableName + "does not exist");
-       }
+        }
         Table tableInstance = Table.open(tableName);
         tableInstance.snapshot(tag);
     }
@@ -592,11 +592,11 @@
      */
     public void takeAllSnapshot(String tag) throws IOException
     {
-       for (String tableName: DatabaseDescriptor.getTables())
+        for (String tableName: DatabaseDescriptor.getTables())
         {
             Table tableInstance = Table.open(tableName);
             tableInstance.snapshot(tag);
-       }
+        }
     }
 
     /**
@@ -604,11 +604,11 @@
      */
     public void clearSnapshot() throws IOException
     {
-       for (String tableName: DatabaseDescriptor.getTables())
+        for (String tableName: DatabaseDescriptor.getTables())
         {
             Table tableInstance = Table.open(tableName);
             tableInstance.clearSnapshot();
-       }
+        }
         if (logger_.isDebugEnabled())
             logger_.debug("Cleared out all snapshot directories");
     }
@@ -789,16 +789,16 @@
      */
     public List<InetAddress> getLiveNaturalEndpoints(String key)
     {
-       List<InetAddress> liveEps = new ArrayList<InetAddress>();
-       List<InetAddress> endpoints = getNaturalEndpoints(key);
-       
-       for ( InetAddress endpoint : endpoints )
-       {
-               if ( FailureDetector.instance().isAlive(endpoint) )
-                       liveEps.add(endpoint);
-       }
-       
-       return liveEps;
+        List<InetAddress> liveEps = new ArrayList<InetAddress>();
+        List<InetAddress> endpoints = getNaturalEndpoints(key);
+        
+        for ( InetAddress endpoint : endpoints )
+        {
+            if ( FailureDetector.instance().isAlive(endpoint) )
+                liveEps.add(endpoint);
+        }
+        
+        return liveEps;
     }
 
     /**
@@ -817,45 +817,45 @@
      * This function finds the most suitable endpoint given a key.
      * It checks for locality and alive test.
      */
-       public InetAddress findSuitableEndPoint(String key) throws IOException, 
UnavailableException
-       {
-               List<InetAddress> endpoints = getNaturalEndpoints(key);
-               for(InetAddress endPoint: endpoints)
-               {
+    public InetAddress findSuitableEndPoint(String key) throws IOException, 
UnavailableException
+    {
+        List<InetAddress> endpoints = getNaturalEndpoints(key);
+        for(InetAddress endPoint: endpoints)
+        {
             if(endPoint.equals(FBUtilities.getLocalAddress()))
-                       {
-                               return endPoint;
-                       }
-               }
-               int j = 0;
-               for ( ; j < endpoints.size(); ++j )
-               {
-                       if ( 
StorageService.instance().isInSameDataCenter(endpoints.get(j)) && 
FailureDetector.instance().isAlive(endpoints.get(j)))
-                       {
-                               return endpoints.get(j);
-                       }
-               }
-               // We have tried to be really nice but looks like there are no 
servers 
-               // in the local data center that are alive and can service this 
request so 
-               // just send it to the first alive guy and see if we get 
anything.
-               j = 0;
-               for ( ; j < endpoints.size(); ++j )
-               {
-                       if ( 
FailureDetector.instance().isAlive(endpoints.get(j)))
-                       {
-                               if (logger_.isDebugEnabled())
-                                 logger_.debug("InetAddress " + 
endpoints.get(j) + " is alive so get data from it.");
-                               return endpoints.get(j);
-                       }
-               }
+            {
+                return endPoint;
+            }
+        }
+        int j = 0;
+        for ( ; j < endpoints.size(); ++j )
+        {
+            if ( 
StorageService.instance().isInSameDataCenter(endpoints.get(j)) && 
FailureDetector.instance().isAlive(endpoints.get(j)))
+            {
+                return endpoints.get(j);
+            }
+        }
+        // We have tried to be really nice but looks like there are no servers 
+        // in the local data center that are alive and can service this 
request so 
+        // just send it to the first alive guy and see if we get anything.
+        j = 0;
+        for ( ; j < endpoints.size(); ++j )
+        {
+            if ( FailureDetector.instance().isAlive(endpoints.get(j)))
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("InetAddress " + endpoints.get(j) + " is alive 
so get data from it.");
+                return endpoints.get(j);
+            }
+        }
 
         throw new UnavailableException(); // no nodes that could contain key 
are alive
-       }
+    }
 
-       Map<Token, InetAddress> getLiveEndPointMap()
-       {
-           return tokenMetadata_.cloneTokenEndPointMap();
-       }
+    Map<Token, InetAddress> getLiveEndPointMap()
+    {
+        return tokenMetadata_.cloneTokenEndPointMap();
+    }
 
     public void setLog4jLevel(String classQualifier, String rawLevel)
     {
@@ -890,4 +890,9 @@
         tokens.add(range.right().toString());
         return tokens;
     }
+
+    public <T> QuorumResponseHandler<T> 
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int 
consistency_level) throws InvalidRequestException, UnavailableException
+    {
+        return replicationStrategy_.getResponseHandler(responseResolver, 
blockFor, consistency_level);
+    }
 }


Reply via email to