Author: jbellis
Date: Wed Aug 19 20:51:06 2009
New Revision: 805968
URL: http://svn.apache.org/viewvc?rev=805968&view=rev
Log:
Add "bootstrap mode" to node startup. This causes the node to tell the
nodes that have data it needs to send it the data, and not otherwise
participate in reads or writes until the bootstrap is complete.
patch by Sandeep Tata; reviewed by jbellis for CASSANDRA-195
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified:
incubator/cassandra/trunk/bin/cassandra
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Modified: incubator/cassandra/trunk/bin/cassandra
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/bin/cassandra?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
--- incubator/cassandra/trunk/bin/cassandra (original)
+++ incubator/cassandra/trunk/bin/cassandra Wed Aug 19 20:51:06 2009
@@ -57,11 +57,16 @@
{
pidpath=$1
foreground=$2
+ bootstrap=$3
cassandra_parms="-Dcassandra -Dstorage-config=$CASSANDRA_CONF"
if [ "x$pidpath" != "x" ]; then
cassandra_parms="$cassandra_parms -Dcassandra-pidfile=$pidpath"
fi
+
+ if [ "x$bootstrap" != "x" ]; then
+ cassandra_parms="$cassandra_parms -Dbootstrap=$bootstrap"
+ fi
# The cassandra-foreground option will tell CassandraDaemon not
# to close stdout/stderr, but it's up to us not to background.
@@ -80,7 +85,7 @@
}
# Parse any command line options.
-args=`getopt fhp: "$@"`
+args=`getopt fhp:b "$@"`
eval set -- "$args"
while true; do
@@ -93,8 +98,12 @@
foreground="yes"
shift
;;
+ -b)
+ bootstrap="true"
+ shift
+ ;;
-h)
- echo "Usage: $0 [-f] [-h] [-p pidfile]"
+ echo "Usage: $0 [-f] [-h] [-p pidfile] [-b]"
exit 0
;;
--)
@@ -109,7 +118,7 @@
done
# Start up the service
-launch_service "$pidfile" "$foreground"
+launch_service "$pidfile" "$foreground" "$bootstrap"
exit $?
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Wed Aug 19 20:51:06 2009
@@ -29,6 +29,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.UnavailableException;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
@@ -72,6 +73,11 @@
try
{
+ if (StorageService.instance().isBootstrapMode())
+ {
+ /* Don't service reads! */
+ throw new RuntimeException("Cannot service reads while
bootstrapping!");
+ }
ReadCommand readCommand =
ReadCommand.serializer().deserialize(readCtx.bufIn_);
Table table = Table.open(readCommand.table);
Row row = null;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
Wed Aug 19 20:51:06 2009
@@ -80,13 +80,14 @@
rm.apply();
metadata_.setStorageId(token);
}
-
+
/*
* This method reads the system table and retrieves the metadata
* associated with this storage instance. Currently we store the
* metadata in a Column Family called LocatioInfo which has two
* columns namely "Token" and "Generation". This is the token that
* gets gossiped around and the generation info is used for FD.
+ * We also store whether we're in bootstrap mode in a third column
*/
public static synchronized StorageMetadata initMetadata() throws
IOException
{
@@ -120,7 +121,7 @@
IColumn generation = cf.getColumn(GENERATION);
int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
-
+
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.LOCATION_CF);
Column generation2 = new Column(GENERATION,
BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed
Aug 19 20:51:06 2009
@@ -35,6 +35,7 @@
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.FileStruct;
+import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -185,7 +186,8 @@
SSTableReader sstable = null;
try
{
- sstable =
SSTableReader.open(streamContext.getTargetFile());
+ sstable =
SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+
//TODO add a sanity check that this sstable has all its
parts and is ok
Table.open(tableName).getColumnFamilyStore(temp[0]).addToList(sstable);
logger_.info("Bootstrap added " + sstable.getFilename());
@@ -202,7 +204,10 @@
/* Send a StreamStatusMessage object which may require the source
node to re-stream certain files. */
StreamContextManager.StreamStatusMessage streamStatusMessage = new
StreamContextManager.StreamStatusMessage(streamStatus);
Message message =
StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.getMessagingInstance().sendOneWay(message, to);
+ MessagingService.getMessagingInstance().sendOneWay(message, to);
+ /* If we're done with everything for this host, remove from
bootstrap sources */
+ if (StreamContextManager.isDone(to.getHost()))
+ StorageService.instance().removeBootstrapSource(to);
}
}
@@ -302,7 +307,7 @@
ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
if (logger_.isDebugEnabled())
logger_.debug("Generating file name for " + distinctEntry +
" ...");
- fileNames.put(distinctEntry, cfStore.getNextFileName());
+ fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
}
return fileNames;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Wed Aug 19 20:51:06 2009
@@ -42,6 +42,7 @@
public class BootStrapper implements Runnable
{
private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+ private static final long INITIAL_DELAY = 60*1000; //ms
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
/* tokens of the nodes being bootstrapped. */
@@ -54,69 +55,27 @@
tokens_ = token;
tokenMetadata_ = StorageService.instance().getTokenMetadata();
}
-
+
public void run()
{
try
{
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning bootstrap process for " +
Arrays.toString(targets_) + " ...");
- /* copy the token to endpoint map */
- Map<Token, EndPoint> tokenToEndPointMap =
tokenMetadata_.cloneTokenEndPointMap();
- /* remove the tokens associated with the endpoints being
bootstrapped */
- for (Token token : tokens_)
+ /* Initial delay waiting for this node to get a stable endpoint
map */
+ Thread.sleep(INITIAL_DELAY);
+ /* Clone again now so we include all discovered nodes in our
calculations */
+ tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ // Mark as not bootstrapping to calculate ranges correctly
+ for (int i=0; i< targets_.length; i++)
{
- tokenToEndPointMap.remove(token);
+ tokenMetadata_.update(tokens_[i], targets_[i], false);
}
-
- Set<Token> oldTokens = new HashSet<Token>(
tokenToEndPointMap.keySet() );
- Range[] oldRanges =
StorageService.instance().getAllRanges(oldTokens);
+
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget =
getRangesWithSourceTarget();
if (logger_.isDebugEnabled())
- logger_.debug("Total number of old ranges " + oldRanges.length);
- /*
- * Find the ranges that are split. Maintain a mapping between
- * the range being split and the list of subranges.
- */
- Map<Range, List<Range>> splitRanges =
LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
- /* Calculate the list of nodes that handle the old ranges */
- Map<Range, List<EndPoint>> oldRangeToEndPointMap =
StorageService.instance().constructRangeToEndPointMap(oldRanges,
tokenToEndPointMap);
- /* Mapping of split ranges to the list of endpoints responsible
for the range */
- Map<Range, List<EndPoint>> replicasForSplitRanges = new
HashMap<Range, List<EndPoint>>();
- Set<Range> rangesSplit = splitRanges.keySet();
- for ( Range splitRange : rangesSplit )
- {
- replicasForSplitRanges.put( splitRange,
oldRangeToEndPointMap.get(splitRange) );
- }
- /* Remove the ranges that are split. */
- for ( Range splitRange : rangesSplit )
- {
- oldRangeToEndPointMap.remove(splitRange);
- }
-
- /* Add the subranges of the split range to the map with the same
replica set. */
- for ( Range splitRange : rangesSplit )
- {
- List<Range> subRanges = splitRanges.get(splitRange);
- List<EndPoint> replicas =
replicasForSplitRanges.get(splitRange);
- for ( Range subRange : subRanges )
- {
- /* Make sure we clone or else we are hammered. */
- oldRangeToEndPointMap.put(subRange, new
ArrayList<EndPoint>(replicas));
- }
- }
-
- /* Add the new token and re-calculate the range assignments */
- Collections.addAll( oldTokens, tokens_ );
- Range[] newRanges =
StorageService.instance().getAllRanges(oldTokens);
-
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of new ranges " + newRanges.length);
- /* Calculate the list of nodes that handle the new ranges */
- Map<Range, List<EndPoint>> newRangeToEndPointMap =
StorageService.instance().constructRangeToEndPointMap(newRanges);
- /* Calculate ranges that need to be sent and from whom to where */
- Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget =
LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap,
newRangeToEndPointMap);
+ logger_.debug("Beginning bootstrap process for " +
Arrays.toString(targets_) + " ...");
/* Send messages to respective folks to stream data over to the
new nodes being bootstrapped */
- LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+ LeaveJoinProtocolHelper.assignWork(rangesWithSourceTarget);
+
}
catch ( Throwable th )
{
@@ -124,5 +83,64 @@
logger_.debug( LogUtil.throwableToString(th) );
}
}
+
+ Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
+ {
+ /* copy the token to endpoint map */
+ Map<Token, EndPoint> tokenToEndPointMap =
tokenMetadata_.cloneTokenEndPointMap();
+ /* remove the tokens associated with the endpoints being bootstrapped
*/
+ for (Token token : tokens_)
+ {
+ tokenToEndPointMap.remove(token);
+ }
+
+ Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet()
);
+ Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Total number of old ranges " + oldRanges.length);
+ /*
+ * Find the ranges that are split. Maintain a mapping between
+ * the range being split and the list of subranges.
+ */
+ Map<Range, List<Range>> splitRanges =
LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
+ /* Calculate the list of nodes that handle the old ranges */
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap =
StorageService.instance().constructRangeToEndPointMap(oldRanges,
tokenToEndPointMap);
+ /* Mapping of split ranges to the list of endpoints responsible for
the range */
+ Map<Range, List<EndPoint>> replicasForSplitRanges = new HashMap<Range,
List<EndPoint>>();
+ Set<Range> rangesSplit = splitRanges.keySet();
+ for ( Range splitRange : rangesSplit )
+ {
+ replicasForSplitRanges.put( splitRange,
oldRangeToEndPointMap.get(splitRange) );
+ }
+ /* Remove the ranges that are split. */
+ for ( Range splitRange : rangesSplit )
+ {
+ oldRangeToEndPointMap.remove(splitRange);
+ }
+
+ /* Add the subranges of the split range to the map with the same
replica set. */
+ for ( Range splitRange : rangesSplit )
+ {
+ List<Range> subRanges = splitRanges.get(splitRange);
+ List<EndPoint> replicas = replicasForSplitRanges.get(splitRange);
+ for ( Range subRange : subRanges )
+ {
+ /* Make sure we clone or else we are hammered. */
+ oldRangeToEndPointMap.put(subRange, new
ArrayList<EndPoint>(replicas));
+ }
+ }
+
+ /* Add the new token and re-calculate the range assignments */
+ Collections.addAll( oldTokens, tokens_ );
+ Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Total number of new ranges " + newRanges.length);
+ /* Calculate the list of nodes that handle the new ranges */
+ Map<Range, List<EndPoint>> newRangeToEndPointMap =
StorageService.instance().constructRangeToEndPointMap(newRanges);
+ /* Calculate ranges that need to be sent and from whom to where */
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget =
LeaveJoinProtocolHelper.getRangeSourceTargetInfo(oldRangeToEndPointMap,
newRangeToEndPointMap);
+ return rangesWithSourceTarget;
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
Wed Aug 19 20:51:06 2009
@@ -55,8 +55,6 @@
BootstrapMetadata(EndPoint target, List<Range> ranges)
{
- assert target_ != null;
- assert ranges_ != null;
target_ = target;
ranges_ = ranges;
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Wed Aug 19 20:51:06 2009
@@ -31,6 +31,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StreamManager;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
@@ -47,6 +48,10 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Received a BootstrapMetadataMessage from " +
message.getFrom());
+
+ /* Cannot bootstrap another node if I'm in bootstrap mode myself! */
+ assert !StorageService.instance().isBootstrapMode();
+
byte[] body = message.getMessageBody();
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
Wed Aug 19 20:51:06 2009
@@ -30,7 +30,8 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
- import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
class LeaveJoinProtocolHelper
@@ -159,6 +160,72 @@
*/
protected static void assignWork(Map<Range, List<BootstrapSourceTarget>>
rangesWithSourceTarget) throws IOException
{
+ Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo =
getWorkMap(rangesWithSourceTarget);
+ sendMessagesToBootstrapSources(rangeInfo);
+ }
+
+ // TODO: Once we're sure we don't need global bootstrap -- clean this code
up
+ // so it is easier to understand what messages are being sent. Local
bootstrap should
+ // look much simpler
+ protected static void assignWorkForLocalBootstrap(Map<Range,
List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
+ {
+ Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo =
getWorkMap(rangesWithSourceTarget);
+ Map<EndPoint, Map<EndPoint, List<Range>>> filteredRanges =
filterRangesForTargetEndPoint(rangeInfo,
+
StorageService.getLocalStorageEndPoint());
+ sendMessagesToBootstrapSources(filteredRanges);
+ }
+
+
+ /**
+ * This method takes the Src -> (Tgt-> List of ranges) maps and retains
those entries
+ * that are relevant to bootstrapping the target endpoint
+ */
+ protected static Map<EndPoint, Map<EndPoint, List<Range>>>
+ filterRangesForTargetEndPoint(Map<EndPoint, Map<EndPoint, List<Range>>>
rangeInfo, EndPoint targetEndPoint)
+ {
+ Map<EndPoint, Map<EndPoint, List<Range>>> filteredMap = new
HashMap<EndPoint, Map<EndPoint,List<Range>>>();
+ for (Map.Entry<EndPoint, Map<EndPoint, List<Range>>> e:
rangeInfo.entrySet())
+ {
+ EndPoint source = e.getKey();
+ Map<EndPoint, List<Range>> targets = e.getValue();
+ Map<EndPoint, List<Range>> filteredTargets = new HashMap<EndPoint,
List<Range>>();
+ if (targets.get(targetEndPoint) != null)
+ filteredTargets.put(targetEndPoint,
targets.get(targetEndPoint));
+ if (filteredTargets.size() > 0)
+ filteredMap.put(source, filteredTargets);
+ }
+ return filteredMap;
+ }
+
+ private static void sendMessagesToBootstrapSources(Map<EndPoint,
Map<EndPoint, List<Range>>> rangeInfo) throws IOException
+ {
+ Set<EndPoint> sources = rangeInfo.keySet();
+ for ( EndPoint source : sources )
+ {
+ Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
+ Set<EndPoint> targets = targetRangesMap.keySet();
+ List<BootstrapMetadata> bsmdList = new
ArrayList<BootstrapMetadata>();
+
+ for ( EndPoint target : targets )
+ {
+ List<Range> rangeForTarget = targetRangesMap.get(target);
+ BootstrapMetadata bsMetadata = new BootstrapMetadata(target,
rangeForTarget);
+ bsmdList.add(bsMetadata);
+ }
+
+ BootstrapMetadataMessage bsMetadataMessage = new
BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
+ /* Send this message to the source to do his shit. */
+ Message message =
BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending the BootstrapMetadataMessage to " +
source);
+ MessagingService.getMessagingInstance().sendOneWay(message,
source);
+ StorageService.instance().addBootstrapSource(source);
+ }
+ }
+
+ static Map<EndPoint, Map<EndPoint, List<Range>>> getWorkMap(
+ Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget)
+ {
/*
* Map whose key is the source node and the value is a map whose key
is the
* target and value is the list of ranges to be sent to it.
@@ -186,27 +253,6 @@
rangesToGive.add(range);
}
}
-
- Set<EndPoint> sources = rangeInfo.keySet();
- for ( EndPoint source : sources )
- {
- Map<EndPoint, List<Range>> targetRangesMap = rangeInfo.get(source);
- Set<EndPoint> targets = targetRangesMap.keySet();
- List<BootstrapMetadata> bsmdList = new
ArrayList<BootstrapMetadata>();
-
- for ( EndPoint target : targets )
- {
- List<Range> rangeForTarget = targetRangesMap.get(target);
- BootstrapMetadata bsMetadata = new BootstrapMetadata(target,
rangeForTarget);
- bsmdList.add(bsMetadata);
- }
-
- BootstrapMetadataMessage bsMetadataMessage = new
BootstrapMetadataMessage(bsmdList.toArray( new BootstrapMetadata[0] ) );
- /* Send this message to the source to do his shit. */
- Message message =
BootstrapMetadataMessage.makeBootstrapMetadataMessage(bsMetadataMessage);
- if (logger_.isDebugEnabled())
- logger_.debug("Sending the BootstrapMetadataMessage to " +
source);
- MessagingService.getMessagingInstance().sendOneWay(message,
source);
- }
+ return rangeInfo;
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
Wed Aug 19 20:51:06 2009
@@ -87,6 +87,11 @@
{
applicationState_.put(key, appState);
}
+
+ void deleteApplicationState(String key)
+ {
+ applicationState_.remove(key);
+ }
/* getters and setters */
long getUpdateTimestamp()
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Wed Aug 19 20:51:06 2009
@@ -920,6 +920,12 @@
epState.addApplicationState(key, appState);
}
}
+
+ public synchronized void deleteApplicationState(String key)
+ {
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ epState.deleteApplicationState(key);
+ }
public void stop()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
Wed Aug 19 20:51:06 2009
@@ -85,7 +85,7 @@
{
return open(dataFileName, StorageService.getPartitioner(),
DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName)));
}
-
+
public static synchronized SSTableReader open(String dataFileName,
IPartitioner partitioner, double cacheFraction) throws IOException
{
SSTableReader sstable = openedFiles.get(dataFileName);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
Wed Aug 19 20:51:06 2009
@@ -21,8 +21,8 @@
*/
-import java.io.IOException;
import java.io.File;
+import java.io.IOException;
import java.io.FileOutputStream;
import java.io.DataOutputStream;
import java.util.Comparator;
@@ -31,6 +31,7 @@
import org.apache.log4j.Logger;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.config.DatabaseDescriptor;
import
com.reardencommerce.kernel.collections.shared.evictable.ConcurrentLinkedHashMap;
@@ -111,13 +112,6 @@
afterAppend(decoratedKey, currentPosition);
}
- private static String rename(String tmpFilename)
- {
- String filename = tmpFilename.replace("-" + TEMPFILE_MARKER, "");
- new File(tmpFilename).renameTo(new File(filename));
- return filename;
- }
-
/**
* Renames temporary SSTable files to valid data, index, and bloom filter
files
*/
@@ -148,4 +142,19 @@
return new SSTableReader(path, partitioner, indexPositions, bf,
keyCache);
}
+ static String rename(String tmpFilename)
+ {
+ String filename = tmpFilename.replace("-" + SSTable.TEMPFILE_MARKER,
"");
+ new File(tmpFilename).renameTo(new File(filename));
+ return filename;
+ }
+
+ public static SSTableReader renameAndOpen(String dataFileName) throws
IOException
+ {
+ SSTableWriter.rename(indexFilename(dataFileName));
+ SSTableWriter.rename(filterFilename(dataFileName));
+ dataFileName = SSTableWriter.rename(dataFileName);
+ return SSTableReader.open(dataFileName,
StorageService.getPartitioner(),
DatabaseDescriptor.getKeysCachedFraction(parseTableName(dataFileName)));
+ }
+
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Wed Aug 19 20:51:06 2009
@@ -36,6 +36,8 @@
private Map<Token, EndPoint> tokenToEndPointMap_ = new HashMap<Token,
EndPoint>();
/* Maintains a reverse index of endpoint to token in the cluster. */
private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint,
Token>();
+ /* Bootstrapping nodes and their tokens */
+ private Map<EndPoint, Token> bootstrapNodes =
Collections.synchronizedMap(new HashMap<EndPoint, Token>());
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -43,8 +45,8 @@
public TokenMetadata()
{
}
-
- private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap,
Map<EndPoint, Token> endPointToTokenMap)
+
+ private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap,
Map<EndPoint, Token> endPointToTokenMap, Map<EndPoint, Token> bootstrapNodes)
{
tokenToEndPointMap_ = tokenToEndPointMap;
endPointToTokenMap_ = endPointToTokenMap;
@@ -52,22 +54,35 @@
public TokenMetadata cloneMe()
{
- return new TokenMetadata(cloneTokenEndPointMap(),
cloneEndPointTokenMap());
+ return new TokenMetadata(cloneTokenEndPointMap(),
cloneEndPointTokenMap(), cloneBootstrapNodes());
}
+ public void update(Token token, EndPoint endpoint)
+ {
+ this.update(token, endpoint, false);
+ }
/**
* Update the two maps in an safe mode.
*/
- public void update(Token token, EndPoint endpoint)
+ public void update(Token token, EndPoint endpoint, boolean bootstrapState)
{
lock_.writeLock().lock();
try
- {
- Token oldToken = endPointToTokenMap_.get(endpoint);
- if ( oldToken != null )
- tokenToEndPointMap_.remove(oldToken);
- tokenToEndPointMap_.put(token, endpoint);
- endPointToTokenMap_.put(endpoint, token);
+ {
+ if (bootstrapState)
+ {
+ bootstrapNodes.put(endpoint, token);
+ this.remove(endpoint);
+ }
+ else
+ {
+ bootstrapNodes.remove(endpoint); // If this happened to be
there
+ Token oldToken = endPointToTokenMap_.get(endpoint);
+ if ( oldToken != null )
+ tokenToEndPointMap_.remove(oldToken);
+ tokenToEndPointMap_.put(token, endpoint);
+ endPointToTokenMap_.put(endpoint, token);
+ }
}
finally
{
@@ -156,6 +171,20 @@
lock_.readLock().unlock();
}
}
+
+ public Map<EndPoint, Token> cloneBootstrapNodes()
+ {
+ lock_.readLock().lock();
+ try
+ {
+ return new HashMap<EndPoint, Token>( bootstrapNodes );
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+
+ }
/*
* Returns a safe clone of tokenToEndPointMap_.
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
Wed Aug 19 20:51:06 2009
@@ -163,6 +163,7 @@
public static void main(String[] args)
{
+
CassandraDaemon daemon = new CassandraDaemon();
String pidFile = System.getProperty("cassandra-pidfile");
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Aug 19 20:51:06 2009
@@ -315,7 +315,8 @@
if (consistency_level == ConsistencyLevel.ONE)
{
boolean foundLocal =
Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
- if (foundLocal)
+ //TODO: Throw InvalidRequest if we're in bootstrap mode?
+ if (foundLocal && !StorageService.instance().isBootstrapMode())
{
row = weakReadLocal(command);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Aug 19 20:51:06 2009
@@ -57,7 +57,7 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
private final static String nodeId_ = "NODE-IDENTIFIER";
- private final static String loadAll_ = "LOAD-ALL";
+ private final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
/* Gossip load after every 5 mins. */
private static final long threshold_ = 5 * 60 * 1000L;
@@ -133,6 +133,9 @@
*/
public static StorageService instance()
{
+ String bs = System.getProperty("bootstrap");
+ boolean bootstrap = bs != null && bs.contains("true");
+
if ( instance_ == null )
{
StorageService.createLock_.lock();
@@ -142,7 +145,7 @@
{
try
{
- instance_ = new StorageService();
+ instance_ = new StorageService(bootstrap);
}
catch ( Throwable th )
{
@@ -184,7 +187,35 @@
private StorageLoadBalancer storageLoadBalancer_;
/* We use this interface to determine where replicas need to be placed */
private IReplicaPlacementStrategy nodePicker_;
+ /* Are we starting this node in bootstrap mode? */
+ private boolean isBootstrapMode;
+ private Set<EndPoint> bootstrapSet;
+
+ public synchronized void addBootstrapSource(EndPoint s)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Added " + s.getHost() + " as a bootstrap source");
+ bootstrapSet.add(s);
+ }
+ public synchronized boolean removeBootstrapSource(EndPoint s)
+ {
+ bootstrapSet.remove(s);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Removed " + s.getHost() + " as a bootstrap source");
+ if (bootstrapSet.isEmpty())
+ {
+ isBootstrapMode = false;
+ tokenMetadata_.update(storageMetadata_.getStorageId(),
StorageService.tcpAddr_, false);
+
+ logger_.info("Bootstrap completed! Now serving reads.");
+ /* Tell others you're not bootstrapping anymore */
+ Gossiper.instance().deleteApplicationState(BOOTSTRAP_MODE);
+ }
+ return isBootstrapMode;
+ }
+
/*
* Registers with Management Server
*/
@@ -202,8 +233,10 @@
}
}
- public StorageService()
+ public StorageService(boolean isBootstrapMode)
{
+ this.isBootstrapMode = isBootstrapMode;
+ bootstrapSet = new HashSet<EndPoint>();
init();
storageLoadBalancer_ = new StorageLoadBalancer(this);
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
@@ -273,9 +306,20 @@
*/
Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
/* Make sure this token gets gossiped around. */
- tokenMetadata_.update(storageMetadata_.getStorageId(),
StorageService.tcpAddr_);
+ tokenMetadata_.update(storageMetadata_.getStorageId(),
StorageService.tcpAddr_, isBootstrapMode);
ApplicationState state = new
ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getStorageId()));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
+ if (isBootstrapMode)
+ {
+ logger_.info("Starting in bootstrap mode");
+ doBootstrap(StorageService.getLocalStorageEndPoint());
+ Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new
ApplicationState(""));
+ }
+ }
+
+ public boolean isBootstrapMode()
+ {
+ return isBootstrapMode;
}
public TokenMetadata getTokenMetadata()
@@ -283,7 +327,7 @@
return tokenMetadata_.cloneMe();
}
- /* TODO: remove later */
+ /* TODO: used for testing */
public void updateTokenMetadata(Token token, EndPoint endpoint)
{
tokenMetadata_.update(token, endpoint);
@@ -395,6 +439,13 @@
EndPoint ep = new EndPoint(endpoint.getHost(),
DatabaseDescriptor.getStoragePort());
/* node identifier for this endpoint on the identifier space */
ApplicationState nodeIdState =
epState.getApplicationState(StorageService.nodeId_);
+ /* Check if this has a bootstrapping state message */
+ boolean bootstrapState =
epState.getApplicationState(StorageService.BOOTSTRAP_MODE) != null;
+ if (bootstrapState)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(ep.getHost() + " is in bootstrap state.");
+ }
if (nodeIdState != null)
{
Token newToken =
getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
@@ -414,7 +465,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Relocation for endpoint " + ep);
- tokenMetadata_.update(newToken, ep);
+ tokenMetadata_.update(newToken, ep, bootstrapState);
}
else
{
@@ -432,7 +483,7 @@
/*
* This is a new node and we just update the token map.
*/
- tokenMetadata_.update(newToken, ep);
+ tokenMetadata_.update(newToken, ep, bootstrapState);
}
}
else
@@ -448,17 +499,6 @@
deliverHints(ep);
}
}
-
- /* Check if a bootstrap is in order */
- ApplicationState loadAllState =
epState.getApplicationState(StorageService.loadAll_);
- if ( loadAllState != null )
- {
- String nodes = loadAllState.getState();
- if ( nodes != null )
- {
- doBootstrap(ep);
- }
- }
}
/**
@@ -965,7 +1005,7 @@
{
return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
}
-
+
/**
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.
@@ -983,7 +1023,7 @@
int j = 0;
for ( ; j < endpoints.length; ++j )
{
- if (
StorageService.instance().isInSameDataCenter(endpoints[j]) &&
FailureDetector.instance().isAlive(endpoints[j]) )
+ if (
StorageService.instance().isInSameDataCenter(endpoints[j]) &&
FailureDetector.instance().isAlive(endpoints[j]))
{
return endpoints[j];
}
@@ -994,7 +1034,7 @@
j = 0;
for ( ; j < endpoints.length; ++j )
{
- if ( FailureDetector.instance().isAlive(endpoints[j]) )
+ if ( FailureDetector.instance().isAlive(endpoints[j]))
{
if (logger_.isDebugEnabled())
logger_.debug("EndPoint " + endpoints[j] + "
is alive so get data from it.");
@@ -1004,6 +1044,11 @@
return null;
}
+ /*
+ * TODO:
+ * This is used by the incomplete multiget implementation. Need to
rewrite
+ * this to use findSuitableEndPoint above instead of copy/paste
+ */
public Map<String, EndPoint> findSuitableEndPoints(String[] keys)
throws IOException
{
Map<String, EndPoint> suitableEndPoints = new HashMap<String,
EndPoint>();
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/BasicUtilities.java
Wed Aug 19 20:51:06 2009
@@ -63,4 +63,14 @@
{
return ByteBuffer.wrap(arg).getShort();
}
+
+ public static byte[] booleanToByteArray(boolean b)
+ {
+ return b ? shortToByteArray((short)1) : shortToByteArray((short)0);
+ }
+
+ public static boolean byteArrayToBoolean(byte[] arg)
+ {
+ return (byteArrayToShort(arg) == (short) 1) ? true : false;
+ }
}
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java?rev=805968&r1=805967&r2=805968&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Wed Aug 19 20:51:06 2009
@@ -88,9 +88,9 @@
assertEquals(true, result.contains("Data.db"));
assertEquals(1, fileNames.entrySet().size());
- assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[0])).getName().matches("Standard1-\\d+-Data.db"));
- assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[1])).getName().matches("Standard1-\\d+-Index.db"));
- assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[2])).getName().matches("Standard1-\\d+-Filter.db"));
+ assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[0])).getName().matches("Standard1-tmp-\\d+-Data.db"));
+ assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
+ assertTrue( new
File(bivh.getNewFileNameFromOldContextAndNames(fileNames,
streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
}
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=805968&view=auto
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(added)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Wed Aug 19 20:51:06 2009
@@ -0,0 +1,81 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.dht;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.service.StorageService;
+import org.junit.Test;
+
+public class BootStrapperTest {
+ @Test
+ public void testSourceTargetComputation()
+ {
+ int numOldNodes = 3;
+ IPartitioner p = generateOldTokens(numOldNodes);
+
+ Token newToken = p.getDefaultToken();
+ EndPoint newEndPoint = new EndPoint("1.2.3.10",100);
+
+ /* New token needs to be part of the map for the algorithm
+ * to calculate the ranges correctly
+ */
+ StorageService.instance().updateTokenMetadata(newToken, newEndPoint);
+
+ BootStrapper b = new BootStrapper(new EndPoint[]{newEndPoint},
newToken );
+ Map<Range,List<BootstrapSourceTarget>> res =
b.getRangesWithSourceTarget();
+
+ int transferCount = 0;
+ for ( Map.Entry<Range, List<BootstrapSourceTarget>> e: res.entrySet())
+ {
+ if (e.getValue() != null && e.getValue().size() >0)
+ {
+ transferCount++;
+ }
+ }
+ /* Only 1 transfer from old node to new node */
+ assertEquals(1, transferCount);
+ Map<EndPoint, Map<EndPoint,List<Range>>> temp =
LeaveJoinProtocolHelper.getWorkMap(res);
+ assertEquals(1, temp.keySet().size());
+ assertEquals(1, temp.entrySet().size());
+
+ Map<EndPoint,Map<EndPoint,List<Range>>> res2 =
LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
+ /* After filtering, still only 1 transfer */
+ assertEquals(1, res2.keySet().size());
+ assertEquals(1, res2.entrySet().size());
+
assertTrue(((Map<EndPoint,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
+ }
+
+ private IPartitioner generateOldTokens(int numOldNodes)
+ {
+ IPartitioner p = new RandomPartitioner();
+ for (int i = 0 ; i< numOldNodes; i++)
+ {
+ EndPoint e = new EndPoint("127.0.0."+i, 100);
+ Token t = p.getDefaultToken();
+ StorageService.instance().updateTokenMetadata(t, e);
+ }
+ return p;
+ }
+}
\ No newline at end of file