Author: jbellis
Date: Mon Apr 27 15:01:36 2009
New Revision: 769017
URL: http://svn.apache.org/viewvc?rev=769017&view=rev
Log:
r/m StorageService.token in favor of explicitly passing a Partitioner object.
this allows testing of components independent of the static SS.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-65
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/SystemTableTest.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
Mon Apr 27 15:01:36 2009
@@ -1113,7 +1113,7 @@
continue;
}
}
- if (Range.isKeyInRanges(p.undecorateKey(lastkey),
ranges))
+ if
(Range.isTokenInRanges(StorageService.getPartitioner().getTokenForKey(lastkey),
ranges))
{
if(ssTableRange == null )
{
@@ -1143,7 +1143,7 @@
continue;
}
/* keep on looping until we find a key
in the range */
- while
(!Range.isKeyInRanges(p.undecorateKey(filestruct.getKey()), ranges))
+ while
(!Range.isTokenInRanges(StorageService.getPartitioner().getTokenForKey(filestruct.getKey()),
ranges))
{
filestruct.advance();
if (filestruct.isExhausted())
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
(original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Mon
Apr 27 15:01:36 2009
@@ -178,12 +178,4 @@
{
throw new UnsupportedOperationException("This operation is not
supported for System tables");
}
-
- public static void main(String[] args) throws Throwable
- {
- LogUtil.init();
- StorageService.instance().start();
-
SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.token("503545744:0"));
- System.out.println("Done");
- }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Mon Apr
27 15:01:36 2009
@@ -134,11 +134,10 @@
}
- public static boolean isKeyInRanges(String key, List<Range> ranges)
+ public static boolean isTokenInRanges(Token token, List<Range> ranges)
{
assert ranges != null;
- Token token = StorageService.token(key);
for (Range range : ranges)
{
if(range.contains(token))
@@ -165,7 +164,7 @@
public String toString()
{
return "(" + left_ + "," + right_ + "]";
- }
+ }
}
class RangeSerializer implements ICompactSerializer<Range>
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
Mon Apr 27 15:01:36 2009
@@ -9,8 +9,8 @@
import org.apache.log4j.Logger;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.EndPoint;
@@ -22,14 +22,20 @@
public abstract class AbstractStrategy implements IReplicaPlacementStrategy
{
protected static Logger logger_ = Logger.getLogger(AbstractStrategy.class);
-
+
protected TokenMetadata tokenMetadata_;
-
- AbstractStrategy(TokenMetadata tokenMetadata)
+ protected IPartitioner partitioner_;
+ protected int replicas_;
+ protected int storagePort_;
+
+ AbstractStrategy(TokenMetadata tokenMetadata, IPartitioner partitioner,
int replicas, int storagePort)
{
tokenMetadata_ = tokenMetadata;
+ partitioner_ = partitioner;
+ replicas_ = replicas;
+ storagePort_ = storagePort;
}
-
+
/*
* This method changes the ports of the endpoints from
* the control port to the storage ports.
@@ -38,7 +44,7 @@
{
for ( EndPoint ep : eps )
{
- ep.setPort(DatabaseDescriptor.getStoragePort());
+ ep.setPort(storagePort_);
}
}
@@ -108,5 +114,4 @@
}
public abstract EndPoint[] getStorageEndPoints(Token token);
-
}
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
Mon Apr 27 15:01:36 2009
@@ -6,15 +6,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.math.BigInteger;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
-
/*
* This class returns the nodes responsible for a given
* key but does respects rack awareness. It makes a best
@@ -24,11 +22,11 @@
*/
public class RackAwareStrategy extends AbstractStrategy
{
- public RackAwareStrategy(TokenMetadata tokenMetadata)
+ public RackAwareStrategy(TokenMetadata tokenMetadata, IPartitioner
partitioner, int replicas, int storagePort)
{
- super(tokenMetadata);
+ super(tokenMetadata, partitioner, replicas, storagePort);
}
-
+
public EndPoint[] getStorageEndPoints(Token token)
{
int startIndex;
@@ -36,7 +34,6 @@
boolean bDataCenter = false;
boolean bOtherRack = false;
int foundCount = 0;
- int N = DatabaseDescriptor.getReplicationFactor();
Map<Token, EndPoint> tokenToEndPointMap =
tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList(tokenToEndPointMap.keySet());
Collections.sort(tokens);
@@ -51,14 +48,14 @@
// Add the node at the index by default
list.add(tokenToEndPointMap.get(tokens.get(index)));
foundCount++;
- if( N == 1 )
+ if( replicas_ == 1 )
{
return list.toArray(new EndPoint[list.size()]);
}
startIndex = (index + 1)%totalNodes;
IEndPointSnitch endPointSnitch =
StorageService.instance().getEndPointSnitch();
- for (int i = startIndex, count = 1; count < totalNodes && foundCount <
N; ++count, i = (i+1)%totalNodes)
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount <
replicas_; ++count, i = (i+1)%totalNodes)
{
try
{
@@ -95,7 +92,7 @@
}
// If we found N number of nodes we are good. This loop wil just exit.
Otherwise just
// loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount <
N; ++count, i = (i+1)%totalNodes)
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount <
replicas_; ++count, i = (i+1)%totalNodes)
{
if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
{
@@ -113,7 +110,7 @@
for ( String key : keys )
{
- results.put(key, getStorageEndPoints(StorageService.token(key)));
+ results.put(key,
getStorageEndPoints(partitioner_.getTokenForKey(key)));
}
return results;
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
Mon Apr 27 15:01:36 2009
@@ -6,11 +6,9 @@
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-
/**
* This class returns the nodes responsible for a given
@@ -19,12 +17,12 @@
* on the ring.
*/
public class RackUnawareStrategy extends AbstractStrategy
-{
- public RackUnawareStrategy(TokenMetadata tokenMetadata)
+{
+ public RackUnawareStrategy(TokenMetadata tokenMetadata, IPartitioner
partitioner, int replicas, int storagePort)
{
- super(tokenMetadata);
+ super(tokenMetadata, partitioner, replicas, storagePort);
}
-
+
public EndPoint[] getStorageEndPoints(Token token)
{
return getStorageEndPoints(token,
tokenMetadata_.cloneTokenEndPointMap());
@@ -35,7 +33,6 @@
int startIndex;
List<EndPoint> list = new ArrayList<EndPoint>();
int foundCount = 0;
- int N = DatabaseDescriptor.getReplicationFactor();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
@@ -52,7 +49,7 @@
startIndex = (index + 1)%totalNodes;
// If we found N number of nodes we are good. This loop will just
exit. Otherwise just
// loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount <
N; ++count, i = (i+1)%totalNodes)
+ for (int i = startIndex, count = 1; count < totalNodes && foundCount <
replicas_; ++count, i = (i+1)%totalNodes)
{
if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
{
@@ -70,7 +67,7 @@
for ( String key : keys )
{
- results.put(key, getStorageEndPoints(StorageService.token(key)));
+ results.put(key,
getStorageEndPoints(partitioner_.getTokenForKey(key)));
}
return results;
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
Mon Apr 27 15:01:36 2009
@@ -38,7 +38,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.math.BigInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -95,8 +94,6 @@
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
-import org.apache.log4j.Logger;
-
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -169,16 +166,6 @@
return "http://" + tcpAddr_.getHost() + ":" +
DatabaseDescriptor.getHttpPort();
}
- /**
- * This is a facade for the hashing
- * function used by the system for
- * partitioning.
- */
- public static Token token(String key)
- {
- return partitioner_.getTokenForKey(key);
- }
-
public static IPartitioner getPartitioner() {
return partitioner_;
}
@@ -347,9 +334,9 @@
StageManager.registerStage(HttpConnection.httpStage_, new
SingleThreadedStage("HTTP-REQUEST"));
if ( DatabaseDescriptor.isRackAware() )
- nodePicker_ = new RackAwareStrategy(tokenMetadata_);
+ nodePicker_ = new RackAwareStrategy(tokenMetadata_, partitioner_,
DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
else
- nodePicker_ = new RackUnawareStrategy(tokenMetadata_);
+ nodePicker_ = new RackUnawareStrategy(tokenMetadata_,
partitioner_, DatabaseDescriptor.getReplicationFactor(),
DatabaseDescriptor.getStoragePort());
}
private void reportToZookeeper() throws Throwable
@@ -787,7 +774,7 @@
Token[] tokens = tokenToEndPointMap.keySet().toArray(new
Token[tokenToEndPointMap.keySet().size()]);
Arrays.sort(tokens);
int index = Arrays.binarySearch(tokens, token) *
(keys.length/tokens.length);
- Token newToken = token( keys[index] );
+ Token newToken = partitioner_.getTokenForKey(keys[index]);
/* update the token */
updateToken(newToken);
}
@@ -1074,7 +1061,7 @@
public EndPoint getPrimary(String key)
{
EndPoint endpoint = StorageService.tcpAddr_;
- Token token = token(key);
+ Token token = partitioner_.getTokenForKey(key);
Map<Token, EndPoint> tokenToEndPointMap =
tokenMetadata_.cloneTokenEndPointMap();
List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.size() > 0)
@@ -1122,8 +1109,7 @@
*/
public EndPoint[] getNStorageEndPoint(String key)
{
- Token token = token(key);
- return nodePicker_.getStorageEndPoints(token);
+ return
nodePicker_.getStorageEndPoints(partitioner_.getTokenForKey(key));
}
private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
@@ -1162,8 +1148,7 @@
*/
public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
{
- Token token = token(key);
- return nodePicker_.getHintedStorageEndPoints(token);
+ return
nodePicker_.getHintedStorageEndPoints(partitioner_.getTokenForKey(key));
}
/**
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/SystemTableTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/SystemTableTest.java?rev=769017&r1=769016&r2=769017&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/SystemTableTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/SystemTableTest.java
Mon Apr 27 15:01:36 2009
@@ -9,6 +9,6 @@
public class SystemTableTest extends ServerTest {
@Test
public void testMain() throws IOException {
- SystemTable.openSystemTable(SystemTable.cfName_).updateToken(
StorageService.token("503545744:0") );
+
SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.getPartitioner().getTokenForKey("503545744:0"));
}
}