Author: jbellis
Date: Wed Jul 29 20:05:46 2009
New Revision: 799043
URL: http://svn.apache.org/viewvc?rev=799043&view=rev
Log:
Pluggable replicaplacement, endpointsnitch classes. patch by Sammy Yu;
reviewed by jbellis for CASSANDRA-323
Modified:
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/conf/storage-conf.xml
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=799043&r1=799042&r2=799043&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Jul 29 20:05:46 2009
@@ -99,12 +99,23 @@
clusters with a small number of nodes. -->
<InitialToken></InitialToken>
- <!-- RackAware: Setting this to true instructs Cassandra to try
- and place one replica in a different datacenter, and the
- others on different racks in the same one. If you haven't
- looked at the code for RackAwareStrategy, leave this off.
+
+ <!-- EndPointSnitch: Setting this to the class that implements
IEndPointSnitch
+ which will see if two endpoints are in the same data center or on
the same rack.
+ Out of the box, Cassandra provides
+ org.apache.cassandra.locator.EndPointSnitch
+ -->
+
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+
+ <!-- Strategy: Setting this to the class that implements
IReplicaPlacementStrategy
+ will change the way the node picker works.
+ Out of the box, Cassandra provides
+ org.apache.cassandra.locator.RackUnawareStrategy
+ org.apache.cassandra.locator.RackAwareStrategy
+ (place one replica in a different datacenter, and the
+ others on different racks in the same one.)
-->
- <RackAware>false</RackAware>
+
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<!-- Number of replicas of the data-->
<ReplicationFactor>1</ReplicationFactor>
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=799043&r1=799042&r2=799043&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Jul 29 20:05:46 2009
@@ -31,6 +31,8 @@
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.locator.IEndPointSnitch;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.XMLUtils;
import org.w3c.dom.Node;
@@ -62,7 +64,6 @@
private static int currentIndex_ = 0;
private static String logFileDirectory_;
private static String bootstrapFileDirectory_;
- private static boolean rackAware_ = false;
private static int consistencyThreads_ = 4; // not configurable
private static int concurrentReaders_ = 8;
private static int concurrentWriters_ = 32;
@@ -86,8 +87,12 @@
* corresponding meta data for that column family.
*/
private static Map<String, Map<String, CFMetaData>> tableToCFMetaDataMap_;
- /* Hashing strategy Random or OPHF */
- private static String partitionerClass_;
+
+ private static IPartitioner partitioner_;
+
+ private static IEndPointSnitch endPointSnitch_;
+ private static Class replicaPlacementStrategyClass_;
+
/* if the size of columns or super-columns are more than this, indexing
will kick in */
private static int columnIndexSizeInKB_;
/* Number of hours to keep a memtable in memory */
@@ -126,7 +131,7 @@
{
configFileName_ = System.getProperty("storage-config") +
File.separator + "storage-conf.xml";
if (logger_.isDebugEnabled())
- logger_.debug("Loading settings from " + configFileName_);
+ logger_.debug("Loading settings from " + configFileName_);
XMLUtils xmlUtils = new XMLUtils(configFileName_);
/* Cluster Name */
@@ -143,20 +148,37 @@
commitLogSyncDelay_ =
Integer.valueOf(xmlUtils.getNodeValue("/Storage/CommitLogSyncDelay"));
/* Hashing strategy */
- partitionerClass_ = xmlUtils.getNodeValue("/Storage/Partitioner");
- try
+ String partitionerClassName =
xmlUtils.getNodeValue("/Storage/Partitioner");
+ if (partitionerClassName == null)
{
- Class.forName(DatabaseDescriptor.getPartitionerClass());
+ throw new ConfigurationException("Missing partitioner
directive /Storage/Partitioner");
}
- catch (NullPointerException e)
+ try
{
- throw new ConfigurationException("Missing partitioner
directive /Storage/Partitioner");
+ Class cls = Class.forName(partitionerClassName);
+ partitioner_ = (IPartitioner)
cls.getConstructor().newInstance();
}
catch (ClassNotFoundException e)
{
- throw new ConfigurationException("Invalid partitioner class "
+ partitionerClass_);
+ throw new ConfigurationException("Invalid partitioner class "
+ partitionerClassName);
}
+ /* end point snitch */
+ String endPointSnitchClassName =
xmlUtils.getNodeValue("/Storage/EndPointSnitch");
+ if (endPointSnitchClassName == null)
+ {
+ throw new ConfigurationException("Missing endpointsnitch
directive /Storage/EndPointSnitch");
+ }
+ try
+ {
+ Class cls = Class.forName(endPointSnitchClassName);
+ endPointSnitch_ = (IEndPointSnitch)
cls.getConstructor().newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid endpointsnitch class
" + endPointSnitchClassName);
+ }
+
/* Callout location */
calloutLocation_ =
xmlUtils.getNodeValue("/Storage/CalloutLocation");
@@ -288,10 +310,20 @@
tableToCFMetaDataMap_ = new HashMap<String, Map<String,
CFMetaData>>();
tableKeysCachedFractions_ = new HashMap<String, Double>();
- /* Rack Aware option */
- value = xmlUtils.getNodeValue("/Storage/RackAware");
- if ( value != null )
- rackAware_ = Boolean.parseBoolean(value);
+ /* See which replica placement strategy to use */
+ String replicaPlacementStrategyClassName =
xmlUtils.getNodeValue("/Storage/ReplicaPlacementStrategy");
+ if (replicaPlacementStrategyClassName == null)
+ {
+ throw new ConfigurationException("Missing
replicaplacementstrategy directive /Storage/ReplicaPlacementStrategy");
+ }
+ try
+ {
+ replicaPlacementStrategyClass_ =
Class.forName(replicaPlacementStrategyClassName);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Invalid
replicaplacementstrategy class " + replicaPlacementStrategyClassName);
+ }
/* Read the table related stuff from config */
NodeList tables =
xmlUtils.getRequestedNodeList("/Storage/Tables/Table");
@@ -542,9 +574,19 @@
return gcGraceInSeconds_;
}
- public static String getPartitionerClass()
+ public static IPartitioner getPartitioner()
+ {
+ return partitioner_;
+ }
+
+ public static IEndPointSnitch getEndPointSnitch()
+ {
+ return endPointSnitch_;
+ }
+
+ public static Class getReplicaPlacementStrategyClass()
{
- return partitionerClass_;
+ return replicaPlacementStrategyClass_;
}
public static String getCalloutLocation()
@@ -760,11 +802,6 @@
logFileDirectory_ = logLocation;
}
- public static boolean isRackAware()
- {
- return rackAware_;
- }
-
public static Set<String> getSeeds()
{
return seeds_;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=799043&r1=799042&r2=799043&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Jul 29 20:05:46 2009
@@ -21,6 +21,7 @@
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -203,8 +204,8 @@
{
init();
storageLoadBalancer_ = new StorageLoadBalancer(this);
- endPointSnitch_ = new EndPointSnitch();
-
+ endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
+
/* register the verb handlers */
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenVerbHandler_,
new TokenUpdateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.binaryVerbHandler_,
new BinaryVerbHandler());
@@ -232,25 +233,19 @@
StageManager.registerStage(StorageService.readStage_,
new
MultiThreadedStage(StorageService.readStage_,
DatabaseDescriptor.getConcurrentReaders()));
- if ( DatabaseDescriptor.isRackAware() )
- nodePicker_ = new RackAwareStrategy(tokenMetadata_, partitioner_,
DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
- else
- nodePicker_ = new RackUnawareStrategy(tokenMetadata_,
partitioner_, DatabaseDescriptor.getReplicationFactor(),
DatabaseDescriptor.getStoragePort());
- }
-
- static
- {
+ Class cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
+ Class [] parameterTypes = new Class[] { TokenMetadata.class,
IPartitioner.class, int.class, int.class };
try
{
- Class cls =
Class.forName(DatabaseDescriptor.getPartitionerClass());
- partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
+ nodePicker_ =
(IReplicaPlacementStrategy)cls.getConstructor(parameterTypes).newInstance(tokenMetadata_,
partitioner_, DatabaseDescriptor.getReplicationFactor(),
DatabaseDescriptor.getStoragePort());
}
catch (Exception e)
{
throw new RuntimeException(e);
}
+ partitioner_ = DatabaseDescriptor.getPartitioner();
}
-
+
public void start() throws IOException
{
storageMetadata_ = SystemTable.initMetadata();
Modified: incubator/cassandra/trunk/test/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/conf/storage-conf.xml?rev=799043&r1=799042&r2=799043&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/test/conf/storage-conf.xml Wed Jul 29 20:05:46
2009
@@ -21,7 +21,8 @@
<CommitLogSync>true</CommitLogSync>
<CommitLogSyncDelay>1000</CommitLogSyncDelay>
<Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
- <RackAware>false</RackAware>
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<RpcTimeoutInMillis>5000</RpcTimeoutInMillis>
<ListenAddress>127.0.0.1</ListenAddress>