Author: jbellis
Date: Tue Nov 3 03:12:16 2009
New Revision: 832285
URL: http://svn.apache.org/viewvc?rev=832285&view=rev
Log:
prefer bootstrapping into ring sections that are not already being bootstrapped
into
patch by jbellis; reviewed by Vijay Parthasarathy for CASSANDRA-517
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Tue Nov 3 03:12:16 2009
@@ -111,41 +111,57 @@
}).start();
}
- public static void guessTokenIfNotSpecified(TokenMetadata metadata) throws
IOException
+ /**
+ * if initialtoken was specified, use that.
+ * otherwise, pick a token to assume half the load of the most-loaded node.
+ */
+ public static Token getBootstrapToken(final TokenMetadata metadata, final
Map<InetAddress, Double> load) throws IOException
{
- StorageService ss = StorageService.instance();
- StorageLoadBalancer slb = StorageLoadBalancer.instance();
+ if (DatabaseDescriptor.getInitialToken() != null)
+ {
+ logger.debug("token manually specified as " +
DatabaseDescriptor.getInitialToken());
+ return
StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
+ }
- slb.waitForLoadInfo();
- logger.debug("... got load info");
+ InetAddress maxEndpoint = getBootstrapSource(metadata, load);
+ Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+ logger.info("New token will be " + t + " to assume load from " +
maxEndpoint);
+ return t;
+ }
- // if initialtoken was specified, use that. otherwise, pick a token
to assume half the load of the most-loaded node.
- if (DatabaseDescriptor.getInitialToken() == null)
+ static InetAddress getBootstrapSource(final TokenMetadata metadata, final
Map<InetAddress, Double> load)
+ {
+ // sort first by number of nodes already bootstrapping into a source
node's range, then by load.
+ List<InetAddress> endpoints = new ArrayList<InetAddress>(load.size());
+ for (InetAddress endpoint : load.keySet())
{
- double maxLoad = 0;
- InetAddress maxEndpoint = null;
- for (Map.Entry<InetAddress, Double> entry :
slb.getLoadInfo().entrySet())
- {
- logger.debug("considering " + entry.getKey() + " with load of
" + entry.getValue());
- if (!metadata.isMember(entry.getKey()))
- continue;
- if (maxEndpoint == null || entry.getValue() > maxLoad)
- {
- maxEndpoint = entry.getKey();
- maxLoad = entry.getValue();
- }
- }
- if (maxEndpoint == null)
+ if (!metadata.isMember(endpoint))
+ continue;
+ endpoints.add(endpoint);
+ }
+
+ if (endpoints.isEmpty())
+ throw new RuntimeException("No other nodes seen! Unable to
bootstrap");
+ Collections.sort(endpoints, new Comparator<InetAddress>()
+ {
+ public int compare(InetAddress ia1, InetAddress ia2)
{
- throw new RuntimeException("No bootstrap sources found");
+ int n1 = metadata.bootstrapTargets(ia1);
+ int n2 = metadata.bootstrapTargets(ia2);
+ if (n1 != n2)
+ return -(n1 - n2); // more targets = _less_ priority!
+
+ double load1 = load.get(ia1);
+ double load2 = load.get(ia2);
+ if (load1 == load2)
+ return 0;
+ return load1 < load2 ? -1 : 1;
}
+ });
- assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
- logger.debug("asking " + maxEndpoint + " for token");
- Token<?> t = getBootstrapTokenFrom(maxEndpoint);
- logger.info("New token will be " + t + " to assume load from " +
maxEndpoint);
- ss.setToken(t);
- }
+ InetAddress maxEndpoint = endpoints.get(endpoints.size() - 1);
+ assert !maxEndpoint.equals(FBUtilities.getLocalAddress());
+ return maxEndpoint;
}
/** get potential sources for each range, ordered by proximity (as
determined by EndPointSnitch) */
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Tue Nov 3 03:12:16 2009
@@ -26,6 +26,9 @@
import org.apache.cassandra.dht.Range;
import java.net.InetAddress;
+
+import org.apache.commons.lang.StringUtils;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.UnavailableException;
@@ -89,6 +92,21 @@
}
}
+ /** @return the number of nodes bootstrapping into source's primary range
*/
+ public int bootstrapTargets(InetAddress source)
+ {
+ int n = 0;
+ Range sourceRange = getPrimaryRangeFor(getToken(source));
+ for (Token token : bootstrapTokenMap.keySet())
+ {
+ if (sourceRange.contains(token))
+ {
+ n++;
+ }
+ }
+ return n;
+ }
+
/**
* Update the two maps in an safe mode.
*/
@@ -270,14 +288,16 @@
{
List tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- return (Token) (index == 0 ? tokens.get(tokens.size() - 1) :
tokens.get(--index));
+ assert index >= 0 : token + " not found in " +
StringUtils.join(tokenToEndPointMap.keySet(), ", ");
+ return (Token) (index == 0 ? tokens.get(tokens.size() - 1) :
tokens.get(index - 1));
}
public Token getSuccessor(Token token)
{
List tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) :
tokens.get(++index));
+ assert index >= 0 : token + " not found in " +
StringUtils.join(tokenToEndPointMap.keySet(), ", ");
+ return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) :
tokens.get(index + 1));
}
public Iterable<? extends Token> bootstrapTokens()
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Nov 3 03:12:16 2009
@@ -270,9 +270,11 @@
if (isBootstrapMode)
{
- logger_.info("Starting in bootstrap mode (first, sleeping to get
load information)");
Gossiper.instance().addApplicationState(MODE, new
ApplicationState(MODE_MOVING));
- BootStrapper.guessTokenIfNotSpecified(tokenMetadata_);
+ logger_.info("Starting in bootstrap mode (first, sleeping to get
load information)");
+ StorageLoadBalancer.instance().waitForLoadInfo();
+ logger_.info("... got load info");
+ setToken(BootStrapper.getBootstrapToken(tokenMetadata_,
StorageLoadBalancer.instance().getLoadInfo()));
new BootStrapper(replicationStrategy_,
FBUtilities.getLocalAddress(), getLocalToken(),
tokenMetadata_).startBootstrap(); // handles token update
}
else
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=832285&r1=832284&r2=832285&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Tue Nov 3 03:12:16 2009
@@ -18,24 +18,56 @@
*/
package org.apache.cassandra.dht;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.gms.IFailureDetector;
-import org.apache.cassandra.gms.IFailureDetectionEventListener;
-import com.google.common.collect.Multimap;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class BootStrapperTest
+{
+ @Test
+ public void testGuessToken() throws IOException
+ {
+ StorageService ss = StorageService.instance();
+
+ generateFakeEndpoints(3);
+
+ InetAddress one = InetAddress.getByName("127.0.0.2");
+ InetAddress two = InetAddress.getByName("127.0.0.3");
+ InetAddress three = InetAddress.getByName("127.0.0.4");
+ Map<InetAddress, Double> load = new HashMap<InetAddress, Double>();
+ load.put(one, 1.0);
+ load.put(two, 2.0);
+ load.put(three, 3.0);
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ InetAddress source = BootStrapper.getBootstrapSource(tmd, load);
+ assert three.equals(source);
+
+ InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
+ tmd.setBootstrapping(myEndpoint, true);
+ Range range3 = ss.getPrimaryRangeForEndPoint(three);
+ Token fakeToken =
((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
+ assert range3.contains(fakeToken);
+ tmd.update(fakeToken, myEndpoint);
+
+ InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);
+ assert two.equals(source2) : source2;
+ }
-public class BootStrapperTest {
@Test
public void testSourceTargetComputation() throws UnknownHostException
{
@@ -49,7 +81,6 @@
StorageService ss = StorageService.instance();
generateFakeEndpoints(numOldNodes);
-
Token myToken = StorageService.getPartitioner().getRandomToken();
InetAddress myEndpoint = InetAddress.getByName("127.0.0.1");
@@ -94,7 +125,9 @@
for (int i = 1; i <= numOldNodes; i++)
{
// leave .1 for myEndpoint
- tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0." +
(i + 1)));
+ // TODO use this when #519 is fixed
+ // tmd.update(p.getRandomToken(), InetAddress.getByName("127.0.0."
+ (i + 1)));
+
tmd.update(p.getToken(FBUtilities.bytesToHex(FBUtilities.toByteArray(i * 13))),
InetAddress.getByName("127.0.0." + (i + 1)));
}
}
}
\ No newline at end of file