Author: jbellis
Date: Sat Nov 14 23:49:55 2009
New Revision: 836295
URL: http://svn.apache.org/viewvc?rev=836295&view=rev
Log:
nodeprobe loadbalance
patch by jbellis; reviewed by goffinet for CASSANDRA-192
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Sat Nov 14 23:49:55 2009
@@ -119,6 +119,11 @@
return
StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
}
+ return getBalancedToken(metadata, load);
+ }
+
+ public static Token getBalancedToken(TokenMetadata metadata,
Map<InetAddress, Double> load)
+ {
InetAddress maxEndpoint = getBootstrapSource(metadata, load);
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
logger.info("New token will be " + t + " to assume load from " +
maxEndpoint);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Sat Nov 14 23:49:55 2009
@@ -1015,10 +1015,20 @@
public void move(String newToken) throws InterruptedException
{
+ move(partitioner_.getTokenFactory().fromString(newToken));
+ }
+
+ public void loadBalance() throws IOException, InterruptedException
+ {
+ Token token = BootStrapper.getBalancedToken(tokenMetadata_,
StorageLoadBalancer.instance().getLoadInfo());
+ move(token);
+ }
+
+ private void move(final Token token) throws InterruptedException
+ {
if
(tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
throw new UnsupportedOperationException("data is currently moving
to this node; unable to leave the ring");
- final Token token =
partitioner_.getTokenFactory().fromString(newToken); // make sure it's valid
logger_.info("moving to " + token);
Gossiper.instance().addApplicationState(STATE_LEAVING, new
ApplicationState(getLocalToken().toString()));
logger_.info("move sleeping " + Streaming.RING_DELAY);
@@ -1051,5 +1061,4 @@
{
return replicationStrategy_;
}
-
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Sat Nov 14 23:49:55 2009
@@ -125,6 +125,12 @@
*/
public void move(String newToken) throws InterruptedException;
+ /**
+ * This node will unload its data onto its neighbors, and bootstrap to
share the range
+ * of the most-loaded node in the ring.
+ */
+ public void loadBalance() throws IOException, InterruptedException;
+
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Sat Nov 14 23:49:55 2009
@@ -24,7 +24,6 @@
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -45,7 +44,6 @@
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.CompactionManagerMBean;
import org.apache.cassandra.dht.Range;
-import java.net.InetAddress;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -92,7 +90,7 @@
* @throws ParseException for missing required, or unrecognized options
* @throws IOException on connection failures
*/
- private NodeProbe(String[] cmdArgs) throws ParseException, IOException
+ private NodeProbe(String[] cmdArgs) throws ParseException, IOException,
InterruptedException
{
parseArgs(cmdArgs);
this.host = cmd.getOptionValue(HOST_OPTION);
@@ -124,7 +122,7 @@
* @param port TCP port of the remote JMX agent
* @throws IOException on connection failures
*/
- public NodeProbe(String host, int port) throws IOException
+ public NodeProbe(String host, int port) throws IOException,
InterruptedException
{
this.host = host;
this.port = port;
@@ -137,7 +135,7 @@
* @param host hostname or IP address of the JMX agent
* @throws IOException on connection failures
*/
- public NodeProbe(String host) throws IOException
+ public NodeProbe(String host) throws IOException, InterruptedException
{
this.host = host;
this.port = defaultPort;
@@ -385,6 +383,11 @@
ssProxy.decommission();
}
+ public void loadBalance() throws IOException, InterruptedException
+ {
+ ssProxy.loadBalance();
+ }
+
public void move(String newToken) throws InterruptedException
{
ssProxy.move(newToken);
@@ -485,7 +488,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats,
snapshot [name], clearsnapshot, " +
- "tpstats, flush, decommission, move, " +
+ "tpstats, flush, decommission, move, loadbalance, " +
" getcompactionthreshold, setcompactionthreshold
[minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n",
NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -548,6 +551,10 @@
{
probe.decommission();
}
+ else if (cmdName.equals("loadbalance"))
+ {
+ probe.loadBalance();
+ }
else if (cmdName.equals("move"))
{
if (arguments.length <= 1)