Author: jbellis
Date: Mon Oct 26 15:00:20 2009
New Revision: 829819
URL: http://svn.apache.org/viewvc?rev=829819&view=rev
Log:
add dcquorum/dcquorumsync consistency levels, representing "a quorum of the
replicas in the current dc" and "a quorum of the replicas in each dc,"
respectively.
patch by Vijay Parthasarathy; reviewed by jbellis for CASSANDRA-492
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Modified:
incubator/cassandra/trunk/interface/cassandra.thrift
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/interface/cassandra.thrift
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/cassandra.thrift?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
--- incubator/cassandra/trunk/interface/cassandra.thrift (original)
+++ incubator/cassandra/trunk/interface/cassandra.thrift Mon Oct 26 15:00:20
2009
@@ -92,7 +92,9 @@
ZERO = 0,
ONE = 1,
QUORUM = 2,
- ALL = 3,
+ DCQUORUM = 3,
+ DCQUORUMSYNC = 4,
+ ALL = 5,
}
struct ColumnParent {
Modified:
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
(original)
+++
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/ConsistencyLevel.java
Mon Oct 26 15:00:20 2009
@@ -38,18 +38,24 @@
public static final int ZERO = 0;
public static final int ONE = 1;
public static final int QUORUM = 2;
- public static final int ALL = 3;
+ public static final int DCQUORUM = 3;
+ public static final int DCQUORUMSYNC = 4;
+ public static final int ALL = 5;
public static final IntRangeSet VALID_VALUES = new IntRangeSet(
ZERO,
ONE,
QUORUM,
+ DCQUORUM,
+ DCQUORUMSYNC,
ALL );
public static final Map<Integer, String> VALUES_TO_NAMES = new
HashMap<Integer, String>() {{
put(ZERO, "ZERO");
put(ONE, "ONE");
put(QUORUM, "QUORUM");
+ put(DCQUORUM, "DCQUORUM");
+ put(DCQUORUMSYNC, "DCQUORUMSYNC");
put(ALL, "ALL");
}};
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Mon Oct 26 15:00:20 2009
@@ -18,17 +18,20 @@
*/
package org.apache.cassandra.locator;
+import java.net.InetAddress;
import java.util.*;
import org.apache.log4j.Logger;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
-import java.net.InetAddress;
-
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.InvalidRequestException;
+import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
/**
* This class contains a helper method that will be used by
@@ -53,6 +56,11 @@
}
public abstract ArrayList<InetAddress> getNaturalEndpoints(Token token,
Map<Token, InetAddress> tokenToEndPointMap);
+
+ public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level) throws InvalidRequestException
+ {
+ return new QuorumResponseHandler<T>(blockFor, responseResolver);
+ }
public ArrayList<InetAddress> getNaturalEndpoints(Token token)
{
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java?rev=829819&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterEndPointSnitch.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,189 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.utils.XMLUtils;
+import org.xml.sax.SAXException;
+
+/**
+ * DataCenterEndPointSnitch
+ * <p/>
+ * This class basically reads the configuration and sets the IP Ranges to a
+ * hashMap which can be read later. this class also provides a way to compare 2
+ * EndPoints and also get details from the same.
+ */
+
+public class DatacenterEndPointSnitch implements IEndPointSnitch
+{
+ /**
+ * This Map will contain the information of the EndPoints and its Location
+ * (Datacenter and RAC)
+ */
+ private Map<Byte, Map<Byte, String>> ipDC = new HashMap<Byte, Map<Byte,
String>>();
+ private Map<Byte, Map<Byte, String>> ipRAC = new HashMap<Byte, Map<Byte,
String>>();
+ private Map<String, Integer> dcRepFactor = new HashMap<String, Integer>();
+
+ private XMLUtils xmlUtils;
+ private Map<String, Integer> quorumDCMap = new HashMap<String, Integer>();
+ /**
+ * The default rack property file to be read.
+ */
+ private static String DEFAULT_RACK_CONFIG_FILE =
"/etc/cassandra/DC-Config.xml";
+
+ /**
+ * Reference to the logger.
+ */
+ private static Logger logger_ = Logger
+ .getLogger(DatacenterEndPointSnitch.class);
+
+ /**
+ * Constructor, intialize XML config and read the config in...
+ */
+ public DatacenterEndPointSnitch() throws IOException,
+ ParserConfigurationException,
SAXException
+ {
+ super();
+ xmlUtils = new XMLUtils(DEFAULT_RACK_CONFIG_FILE);
+ reloadConfiguration();
+ }
+
+ /**
+ * Return the rack for which an endpoint resides in
+ */
+ public String getRackForEndPoint(InetAddress endPoint)
+ throws UnknownHostException
+ {
+ byte[] ipQuads = getIPAddress(endPoint.getHostAddress());
+ return ipRAC.get(ipQuads[1]).get(ipQuads[2]);
+ }
+
+ /**
+ * This method will load the configuration from the xml file. Mandatory
+ * fields are Atleast 1 DC and 1RAC configurations. Name of the DC/RAC, IP
+ * Quadrents for RAC and DC.
+ * <p/>
+ * This method will not be called everytime
+ */
+ public void reloadConfiguration() throws IOException
+ {
+ try
+ {
+ String[] dcNames =
xmlUtils.getNodeValues("/EndPoints/DataCenter/name");
+ for (String dcName : dcNames)
+ {
+ // Parse the Datacenter Quaderant.
+ String dcXPath = "/EndPoints/DataCenter[name='" + dcName +
"']";
+ String dcIPQuad = xmlUtils.getNodeValue(dcXPath +
"/ip2ndQuad");
+ String replicationFactor = xmlUtils.getNodeValue(dcXPath +
"/replicationFactor");
+ byte dcByte = intToByte(Integer.parseInt(dcIPQuad));
+ // Parse the replication factor for a DC
+ int dcReF = Integer.parseInt(replicationFactor);
+ dcRepFactor.put(dcName, dcReF);
+ quorumDCMap.put(dcName, (dcReF / 2 + 1));
+ String[] racNames = xmlUtils.getNodeValues(dcXPath +
"/rack/name");
+ Map<Byte, String> dcRackMap = ipDC.get(dcByte);
+ if (null == dcRackMap)
+ {
+ dcRackMap = new HashMap<Byte, String>();
+ }
+ Map<Byte, String> rackDcMap = ipRAC.get(dcByte);
+ if (null == rackDcMap)
+ {
+ rackDcMap = new HashMap<Byte, String>();
+ }
+ for (String racName : racNames)
+ {
+ // Parse the RAC ip Quaderant.
+ String racIPQuad = xmlUtils.getNodeValue(dcXPath +
"/rack[name = '" + racName + "']/ip3rdQuad");
+ byte racByte = intToByte(Integer.parseInt(racIPQuad));
+ dcRackMap.put(racByte, dcName);
+ rackDcMap.put(racByte, racName);
+ }
+ ipDC.put(dcByte, dcRackMap);
+ ipRAC.put(dcByte, rackDcMap);
+ }
+ }
+ catch (Exception ioe)
+ {
+ throw new IOException("Could not process " +
DEFAULT_RACK_CONFIG_FILE, ioe);
+ }
+ }
+
+ /**
+ * This methood will return ture if the hosts are in the same RAC else
+ * false.
+ */
+ public boolean isOnSameRack(InetAddress host, InetAddress host2)
+ throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+ * octet. If they are the same then the hosts are in the same rack else
+ * different racks.
+ */
+ byte[] ip = getIPAddress(host.getHostAddress());
+ byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+ return ipRAC.get(ip[1]).get(ip[2])
+ .equals(ipRAC.get(ip2[1]).get(ip2[2]));
+ }
+
+ /**
+ * This methood will return ture if the hosts are in the same DC else
false.
+ */
+ public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
+ throws UnknownHostException
+ {
+ /*
+ * Look at the IP Address of the two hosts. Compare the 2nd and 3rd
+ * octet and get the DC Name. If they are the same then the hosts are in
+ * the same datacenter else different datacenter.
+ */
+ byte[] ip = getIPAddress(host.getHostAddress());
+ byte[] ip2 = getIPAddress(host2.getHostAddress());
+
+ return ipDC.get(ip[1]).get(ip[2]).equals(ipDC.get(ip2[1]).get(ip2[2]));
+ }
+
+ /**
+ * Returns a DC replication map, the key will be the dc name and the value
+ * will be the replication factor of that Datacenter.
+ */
+ public HashMap<String, Integer> getMapReplicationFactor()
+ {
+ return new HashMap<String, Integer>(dcRepFactor);
+ }
+
+ /**
+ * Returns a DC replication map, the key will be the dc name and the value
+ * will be the replication factor of that Datacenter.
+ */
+ public HashMap<String, Integer> getMapQuorumFactor()
+ {
+ return new HashMap<String, Integer>(quorumDCMap);
+ }
+
+ private byte[] getIPAddress(String host) throws UnknownHostException
+ {
+ InetAddress ia = InetAddress.getByName(host);
+ return ia.getAddress();
+ }
+
+ public static byte intToByte(int n)
+ {
+ return (byte) (n & 0x000000ff);
+ }
+
+ public String getLocation(InetAddress endpoint) throws UnknownHostException
+ {
+ byte[] ipQuads = getIPAddress(endpoint.getHostAddress());
+ return ipDC.get(ipQuads[1]).get(ipQuads[2]);
+ }
+}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=829819&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,216 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.io.IOError;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.*;
+
+/**
+ * This Stategy is little diffrent than the Rack aware Statergy. If there is
+ * replication factor is N. We will make sure that (N-1)%2 of the nodes are in
+ * other Datacenter.... For example if we have 5 nodes this stategy will make
+ * sure to make 2 copies out of 5 in other dataceneter.
+ * <p/>
+ * This class also caches the EndPoints and invalidates the cache if there is a
+ * change in the number of tokens.
+ */
+public class DatacenterShardStategy extends AbstractReplicationStrategy
+{
+ private static Map<String, List<Token>> dcMap = new HashMap<String,
List<Token>>();
+ private static Map<String, Integer> dcReplicationFactor = new
HashMap<String, Integer>();
+ private static Map<String, Integer> quorumRepFactor = new HashMap<String,
Integer>();
+ private static int locQFactor = 0;
+ private static DatacenterEndPointSnitch endPointSnitch;
+ ArrayList<Token> tokens;
+
+ private List<InetAddress> localEndPoints = new ArrayList<InetAddress>();
+
+ private List<InetAddress> getLocalEndPoints()
+ {
+ return new ArrayList<InetAddress>(localEndPoints);
+ }
+
+ private Map<String, Integer> getQuorumRepFactor()
+ {
+ return new HashMap<String, Integer>(quorumRepFactor);
+ }
+
+ /**
+ * This Method will get the required information of the EndPoint from the
+ * DataCenterEndPointSnitch and poopulates this singleton class.
+ *
+ * @param tokenToEndPointMap - Provided the endpoint map which will be
mapped with the DC's
+ */
+ private void loadEndPoints(Map<Token, InetAddress> tokenToEndPointMap,
Collection<Token> tokens) throws IOException
+ {
+ endPointSnitch = (DatacenterEndPointSnitch)
StorageService.instance().getEndPointSnitch();
+ this.tokens = new ArrayList<Token>(tokens);
+ String localDC =
endPointSnitch.getLocation(InetAddress.getLocalHost());
+ for (Token token : this.tokens)
+ {
+ InetAddress endPoint = tokenToEndPointMap.get(token);
+ String dataCenter = endPointSnitch.getLocation(endPoint);
+ if (dataCenter.equals(localDC))
+ {
+ localEndPoints.add(endPoint);
+ }
+ List<Token> lst = dcMap.get(dataCenter);
+ if (lst == null)
+ {
+ lst = new ArrayList<Token>();
+ }
+ lst.add(token);
+ dcMap.put(dataCenter, lst);
+ }
+ for (Entry<String, List<Token>> entry : dcMap.entrySet())
+ {
+ List<Token> valueList = entry.getValue();
+ Collections.sort(valueList);
+ dcMap.put(entry.getKey(), valueList);
+ }
+ dcReplicationFactor = endPointSnitch.getMapReplicationFactor();
+ for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
+ {
+ String datacenter = entry.getKey();
+ int qFactor = (entry.getValue() / 2 + 1);
+ quorumRepFactor.put(datacenter, qFactor);
+ if (datacenter.equals(localDC))
+ {
+ locQFactor = qFactor;
+ }
+ }
+ }
+
+ public DatacenterShardStategy(TokenMetadata tokenMetadata,
IPartitioner<Token> partitioner, int replicas, int storagePort)
+ throws UnknownHostException
+ {
+ super(tokenMetadata, partitioner, replicas, storagePort);
+ assert (DatabaseDescriptor.getEndPointSnitch() instanceof
DatacenterEndPointSnitch);
+ }
+
+ @Override
+ public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken,
Map<Token, InetAddress> tokenToEndPointMap)
+ {
+ try
+ {
+ return getNaturalEndpointsInternal(searchToken,
tokenToEndPointMap);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
+ private ArrayList<InetAddress> getNaturalEndpointsInternal(Token
searchToken, Map<Token, InetAddress> tokenToEndPointMap) throws IOException
+ {
+ ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+ if (null == tokens || this.tokens.size() !=
tokenToEndPointMap.keySet().size())
+ {
+ loadEndPoints(tokenToEndPointMap, tokenToEndPointMap.keySet());
+ }
+
+ for (String dc : dcMap.keySet())
+ {
+ int foundCount = 0;
+ ArrayList<InetAddress> forloopReturn = new
ArrayList<InetAddress>();
+ int replicas_ = dcReplicationFactor.get(dc);
+ List tokens = dcMap.get(dc);
+ boolean bOtherRack = false;
+ boolean doneDataCenterItr;
+ int index = Collections.binarySearch(tokens, searchToken);
+ if (index < 0)
+ {
+ index = (index + 1) * (-1);
+ if (index >= tokens.size())
+ {
+ index = 0;
+ }
+ }
+ int totalNodes = tokens.size();
+ // Add the node at the index by default
+ InetAddress primaryHost =
tokenToEndPointMap.get(tokens.get(index));
+ forloopReturn.add(primaryHost);
+ foundCount++;
+ if (replicas_ == 1)
+ {
+ continue;
+ }
+
+ int startIndex = (index + 1) % totalNodes;
+ for (int i = startIndex, count = 1; count < totalNodes &&
foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+ {
+ InetAddress endPointOfIntrest =
tokenToEndPointMap.get(tokens.get(i));
+ if ((replicas_ - 1) > foundCount)
+ {
+ forloopReturn.add(endPointOfIntrest);
+ continue;
+ }
+ else
+ {
+ doneDataCenterItr = true;
+ }
+ // Now try to find one on a different rack
+ if (!bOtherRack)
+ {
+ if (!endPointSnitch.isOnSameRack(primaryHost,
endPointOfIntrest))
+ {
+
forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+ bOtherRack = true;
+ foundCount++;
+ }
+ }
+ // If both already found exit loop.
+ if (doneDataCenterItr && bOtherRack)
+ {
+ break;
+ }
+ }
+
+ /*
+ * If we found N number of nodes we are good. This loop wil just
+ * exit. Otherwise just loop through the list and add until we
+ * have N nodes.
+ */
+ for (int i = startIndex, count = 1; count < totalNodes &&
foundCount < replicas_; ++count, i = (i + 1) % totalNodes)
+ {
+ if
(!forloopReturn.contains(tokenToEndPointMap.get(tokens.get(i))))
+ {
+ forloopReturn.add(tokenToEndPointMap.get(tokens.get(i)));
+ foundCount++;
+ }
+ }
+ endpoints.addAll(forloopReturn);
+ }
+
+ return endpoints;
+ }
+
+ /**
+ * This method will generate the QRH object and returns. If the Consistency
+ * level is DCQUORUM then it will return a DCQRH with a map of local rep
+ * factor alone. If the consistency level is DCQUORUMSYNC then it will
+ * return a DCQRH with a map of all the DC rep facor.
+ */
+ @Override
+ public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level)
+ throws InvalidRequestException
+ {
+ if (consistency_level == ConsistencyLevel.DCQUORUM)
+ {
+ List<InetAddress> endpoints = getLocalEndPoints();
+ return new DatacenterQuorumResponseHandler<T>(endpoints,
locQFactor, responseResolver);
+ }
+ else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
+ {
+ return new
DatacenterQuorumSyncResponseHandler<T>(getQuorumRepFactor(), responseResolver);
+ }
+ return super.getResponseHandler(responseResolver, blockFor,
consistency_level);
+ }
+}
\ No newline at end of file
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/EndPointSnitch.java
Mon Oct 26 15:00:20 2009
@@ -18,9 +18,8 @@
package org.apache.cassandra.locator;
-import java.net.*;
-
import java.net.InetAddress;
+import java.net.UnknownHostException;
public class EndPointSnitch implements IEndPointSnitch
{
@@ -30,13 +29,13 @@
* Look at the IP Address of the two hosts. Compare
* the 3rd octet. If they are the same then the hosts
* are in the same rack else different racks.
- */
+ */
byte[] ip = host.getAddress();
byte[] ip2 = host2.getAddress();
-
- return ( ip[2] == ip2[2] );
+
+ return ip[2] == ip2[2];
}
-
+
public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
throws UnknownHostException
{
/*
@@ -46,7 +45,12 @@
*/
byte[] ip = host.getAddress();
byte[] ip2 = host2.getAddress();
-
- return ( ip[1] == ip2[1] );
+
+ return ip[1] == ip2[1];
+ }
+
+ public String getLocation(InetAddress endpoint) throws UnknownHostException
+ {
+ throw new UnknownHostException("Not Supported");
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndPointSnitch.java
Mon Oct 26 15:00:20 2009
@@ -27,6 +27,8 @@
* This interface helps determine location of node in the data center relative
to another node.
* Give a node A and another node B it can tell if A and B are on the same
rack or in the same
* data center.
+ *
+ * Not all methods will be germate to all implementations. Throw
UnsupportedOperation as necessary.
*/
public interface IEndPointSnitch
@@ -48,4 +50,9 @@
* @throws UnknownHostException
*/
public boolean isInSameDataCenter(InetAddress host, InetAddress host2)
throws UnknownHostException;
+
+ /**
+ * Given endpoints this method will help us know the datacenter name where
the node is located at.
+ */
+ public String getLocation(InetAddress endpoint) throws
UnknownHostException;
}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=829819&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,50 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will basically will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from (DC,
n)
+ * nodes.
+ */
+public class DatacenterQuorumResponseHandler<T> extends
QuorumResponseHandler<T>
+{
+ private final List<InetAddress> waitList;
+ private int blockFor;
+
+ public DatacenterQuorumResponseHandler(List<InetAddress> waitList, int
blockFor, IResponseResolver<T> responseResolver)
+ throws InvalidRequestException
+ {
+ // Response is been managed by the map so the waitlist size really
doesnt matter.
+ super(blockFor, responseResolver);
+ this.blockFor = blockFor;
+ this.waitList = waitList;
+ }
+
+ @Override
+ public void response(Message message)
+ {
+ if (condition_.isSignaled())
+ {
+ return;
+ }
+
+ if (waitList.contains(message.getFrom()))
+ {
+ blockFor--;
+ }
+ responses_.add(message);
+ // If done then the response count will be empty after removing
+ // everything.
+ if (blockFor <= 0)
+ {
+ condition_.signal();
+ }
+ }
+}
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java?rev=829819&view=auto
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
(added)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumSyncResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -0,0 +1,75 @@
+/**
+ *
+ */
+package org.apache.cassandra.service;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.Message;
+
+/**
+ * This class will block for the replication factor which is
+ * provided in the input map. it will block till we recive response from
+ * n nodes in each of our data centers.
+ */
+public class DatacenterQuorumSyncResponseHandler<T> extends
QuorumResponseHandler<T>
+{
+ private final Map<String, Integer> dcResponses = new HashMap<String,
Integer>();
+ private final Map<String, Integer> responseCounts;
+
+ public DatacenterQuorumSyncResponseHandler(Map<String, Integer>
responseCounts, IResponseResolver<T> responseResolver)
+ throws InvalidRequestException
+ {
+ // Response is been managed by the map so make it 1 for the superclass.
+ super(1, responseResolver);
+ this.responseCounts = responseCounts;
+ }
+
+ @Override
+ public void response(Message message)
+ {
+ if (condition_.isSignaled())
+ {
+ return;
+ }
+ try
+ {
+ String dataCenter =
DatabaseDescriptor.getEndPointSnitch().getLocation(message.getFrom());
+ Object blockFor = responseCounts.get(dataCenter);
+ // If this DC needs to be blocked then do the below.
+ if (blockFor != null)
+ {
+ Integer quorumCount = dcResponses.get(dataCenter);
+ if (quorumCount == null)
+ {
+ // Intialize and recognize the first response
+ dcResponses.put(dataCenter, 1);
+ }
+ else if ((Integer) blockFor > quorumCount)
+ {
+ // recognize the consequtive responses.
+ dcResponses.put(dataCenter, quorumCount + 1);
+ }
+ else
+ {
+ // No need to wait on it anymore so remove it.
+ responseCounts.remove(dataCenter);
+ }
+ }
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ responses_.add(message);
+ // If done then the response count will be empty after removing
+ // everything.
+ if (responseCounts.isEmpty())
+ {
+ condition_.signal();
+ }
+ }
+}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Mon Oct 26 15:00:20 2009
@@ -37,10 +37,10 @@
public class QuorumResponseHandler<T> implements IAsyncCallback
{
- private static Logger logger_ = Logger.getLogger(
QuorumResponseHandler.class );
- private SimpleCondition condition_ = new SimpleCondition();
+ protected static Logger logger_ = Logger.getLogger(
QuorumResponseHandler.class );
+ protected SimpleCondition condition_ = new SimpleCondition();
private int responseCount_;
- private List<Message> responses_ = new ArrayList<Message>();
+ protected List<Message> responses_ = new ArrayList<Message>();
private IResponseResolver<T> responseResolver_;
private long startTime_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Oct 26 15:00:20 2009
@@ -72,27 +72,27 @@
*/
private static Map<InetAddress, Message> createWriteMessages(RowMutation
rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
{
- Map<InetAddress, Message> messageMap = new HashMap<InetAddress,
Message>();
- Message message = rm.makeRowMutationMessage();
+ Map<InetAddress, Message> messageMap = new HashMap<InetAddress,
Message>();
+ Message message = rm.makeRowMutationMessage();
- for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
- {
+ for (Map.Entry<InetAddress, InetAddress> entry :
endpointMap.entrySet())
+ {
InetAddress target = entry.getKey();
InetAddress hint = entry.getValue();
if ( !target.equals(hint) )
- {
- Message hintedMessage =
rm.makeRowMutationMessage();
- hintedMessage.addHeader(RowMutation.HINT,
hint.getAddress());
- if (logger.isDebugEnabled())
- logger.debug("Sending the hint of " + hint
+ " to " + target);
- messageMap.put(target, hintedMessage);
- }
- else
- {
- messageMap.put(target, message);
- }
- }
- return messageMap;
+ {
+ Message hintedMessage = rm.makeRowMutationMessage();
+ hintedMessage.addHeader(RowMutation.HINT, hint.getAddress());
+ if (logger.isDebugEnabled())
+ logger.debug("Sending the hint of " + hint + " to " +
target);
+ messageMap.put(target, hintedMessage);
+ }
+ else
+ {
+ messageMap.put(target, message);
+ }
+ }
+ return messageMap;
}
/**
@@ -103,7 +103,7 @@
* @param rm the mutation to be applied across the replicas
*/
public static void insert(RowMutation rm)
- {
+ {
/*
* Get the N nodes from storage service where the data needs to be
* replicated
@@ -112,21 +112,21 @@
*/
long startTime = System.currentTimeMillis();
- try
- {
+ try
+ {
List<InetAddress> naturalEndpoints =
StorageService.instance().getNaturalEndpoints(rm.key());
// (This is the ZERO consistency level, so user doesn't care if we
don't really have N destinations available.)
- Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
- Map<InetAddress, Message> messageMap =
createWriteMessages(rm, endpointMap);
- for (Map.Entry<InetAddress, Message> entry :
messageMap.entrySet())
- {
+ Map<InetAddress, InetAddress> endpointMap =
StorageService.instance().getHintedEndpointMap(rm.key(), naturalEndpoints);
+ Map<InetAddress, Message> messageMap = createWriteMessages(rm,
endpointMap);
+ for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
+ {
Message message = entry.getValue();
InetAddress endpoint = entry.getKey();
if (logger.isDebugEnabled())
logger.debug("insert writing key " + rm.key() + " to " +
message.getMessageId() + "@" + endpoint);
MessagingService.instance().sendOneWay(message, endpoint);
- }
- }
+ }
+ }
catch (IOException e)
{
throw new RuntimeException("error inserting key " + rm.key(), e);
@@ -159,7 +159,7 @@
{
throw new UnavailableException();
}
- QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(blockFor, new WriteResponseResolver());
+ QuorumResponseHandler<Boolean> quorumResponseHandler =
StorageService.instance().getResponseHandler(new WriteResponseResolver(),
blockFor, consistency_level);
if (logger.isDebugEnabled())
logger.debug("insertBlocking writing key " + rm.key() + " to "
+ message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ")
+ "]");
@@ -204,6 +204,7 @@
private static int determineBlockFor(int naturalTargets, int
hintedTargets, int consistency_level)
{
+ // TODO this is broken for DC quorum / DC quorum sync
int bootstrapTargets = hintedTargets - naturalTargets;
int blockFor;
if (consistency_level == ConsistencyLevel.ONE)
@@ -318,8 +319,8 @@
}
else
{
- assert consistency_level == ConsistencyLevel.QUORUM;
- rows = strongRead(commands);
+ assert consistency_level >= ConsistencyLevel.QUORUM;
+ rows = strongRead(commands, consistency_level);
}
readStats.add(System.currentTimeMillis() - startTime);
@@ -339,7 +340,7 @@
* 7. else carry out read repair by getting data from all the nodes.
// 5. return success
*/
- private static List<Row> strongRead(List<ReadCommand> commands) throws
IOException, TimeoutException, InvalidRequestException, UnavailableException
+ private static List<Row> strongRead(List<ReadCommand> commands, int
consistency_level) throws IOException, TimeoutException,
InvalidRequestException, UnavailableException
{
List<QuorumResponseHandler<Row>> quorumResponseHandlers = new
ArrayList<QuorumResponseHandler<Row>>();
List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
@@ -357,7 +358,7 @@
Message message = command.makeReadMessage();
Message messageDigestOnly =
readMessageDigestOnly.makeReadMessage();
- QuorumResponseHandler<Row> quorumResponseHandler = new
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new
ReadResponseResolver());
+ QuorumResponseHandler<Row> quorumResponseHandler =
StorageService.instance().getResponseHandler(new ReadResponseResolver(),
DatabaseDescriptor.getQuorum(), consistency_level);
InetAddress dataPoint =
StorageService.instance().findSuitableEndPoint(command.key);
List<InetAddress> endpointList =
StorageService.instance().getNaturalEndpoints(command.key);
/* Remove the local storage endpoint from the list. */
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=829819&r1=829818&r2=829819&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon Oct 26 15:00:20 2009
@@ -289,7 +289,7 @@
public IEndPointSnitch getEndPointSnitch()
{
- return endPointSnitch_;
+ return endPointSnitch_;
}
/*
@@ -577,10 +577,10 @@
*/
public void takeSnapshot(String tableName, String tag) throws IOException
{
- if (DatabaseDescriptor.getTable(tableName) == null)
+ if (DatabaseDescriptor.getTable(tableName) == null)
{
throw new IOException("Table " + tableName + "does not exist");
- }
+ }
Table tableInstance = Table.open(tableName);
tableInstance.snapshot(tag);
}
@@ -592,11 +592,11 @@
*/
public void takeAllSnapshot(String tag) throws IOException
{
- for (String tableName: DatabaseDescriptor.getTables())
+ for (String tableName: DatabaseDescriptor.getTables())
{
Table tableInstance = Table.open(tableName);
tableInstance.snapshot(tag);
- }
+ }
}
/**
@@ -604,11 +604,11 @@
*/
public void clearSnapshot() throws IOException
{
- for (String tableName: DatabaseDescriptor.getTables())
+ for (String tableName: DatabaseDescriptor.getTables())
{
Table tableInstance = Table.open(tableName);
tableInstance.clearSnapshot();
- }
+ }
if (logger_.isDebugEnabled())
logger_.debug("Cleared out all snapshot directories");
}
@@ -789,16 +789,16 @@
*/
public List<InetAddress> getLiveNaturalEndpoints(String key)
{
- List<InetAddress> liveEps = new ArrayList<InetAddress>();
- List<InetAddress> endpoints = getNaturalEndpoints(key);
-
- for ( InetAddress endpoint : endpoints )
- {
- if ( FailureDetector.instance().isAlive(endpoint) )
- liveEps.add(endpoint);
- }
-
- return liveEps;
+ List<InetAddress> liveEps = new ArrayList<InetAddress>();
+ List<InetAddress> endpoints = getNaturalEndpoints(key);
+
+ for ( InetAddress endpoint : endpoints )
+ {
+ if ( FailureDetector.instance().isAlive(endpoint) )
+ liveEps.add(endpoint);
+ }
+
+ return liveEps;
}
/**
@@ -817,45 +817,45 @@
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.
*/
- public InetAddress findSuitableEndPoint(String key) throws IOException,
UnavailableException
- {
- List<InetAddress> endpoints = getNaturalEndpoints(key);
- for(InetAddress endPoint: endpoints)
- {
+ public InetAddress findSuitableEndPoint(String key) throws IOException,
UnavailableException
+ {
+ List<InetAddress> endpoints = getNaturalEndpoints(key);
+ for(InetAddress endPoint: endpoints)
+ {
if(endPoint.equals(FBUtilities.getLocalAddress()))
- {
- return endPoint;
- }
- }
- int j = 0;
- for ( ; j < endpoints.size(); ++j )
- {
- if (
StorageService.instance().isInSameDataCenter(endpoints.get(j)) &&
FailureDetector.instance().isAlive(endpoints.get(j)))
- {
- return endpoints.get(j);
- }
- }
- // We have tried to be really nice but looks like there are no
servers
- // in the local data center that are alive and can service this
request so
- // just send it to the first alive guy and see if we get
anything.
- j = 0;
- for ( ; j < endpoints.size(); ++j )
- {
- if (
FailureDetector.instance().isAlive(endpoints.get(j)))
- {
- if (logger_.isDebugEnabled())
- logger_.debug("InetAddress " +
endpoints.get(j) + " is alive so get data from it.");
- return endpoints.get(j);
- }
- }
+ {
+ return endPoint;
+ }
+ }
+ int j = 0;
+ for ( ; j < endpoints.size(); ++j )
+ {
+ if (
StorageService.instance().isInSameDataCenter(endpoints.get(j)) &&
FailureDetector.instance().isAlive(endpoints.get(j)))
+ {
+ return endpoints.get(j);
+ }
+ }
+ // We have tried to be really nice but looks like there are no servers
+ // in the local data center that are alive and can service this
request so
+ // just send it to the first alive guy and see if we get anything.
+ j = 0;
+ for ( ; j < endpoints.size(); ++j )
+ {
+ if ( FailureDetector.instance().isAlive(endpoints.get(j)))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("InetAddress " + endpoints.get(j) + " is alive
so get data from it.");
+ return endpoints.get(j);
+ }
+ }
throw new UnavailableException(); // no nodes that could contain key
are alive
- }
+ }
- Map<Token, InetAddress> getLiveEndPointMap()
- {
- return tokenMetadata_.cloneTokenEndPointMap();
- }
+ Map<Token, InetAddress> getLiveEndPointMap()
+ {
+ return tokenMetadata_.cloneTokenEndPointMap();
+ }
public void setLog4jLevel(String classQualifier, String rawLevel)
{
@@ -890,4 +890,9 @@
tokens.add(range.right().toString());
return tokens;
}
+
+ public <T> QuorumResponseHandler<T>
getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int
consistency_level) throws InvalidRequestException, UnavailableException
+ {
+ return replicationStrategy_.getResponseHandler(responseResolver,
blockFor, consistency_level);
+ }
}