Author: jbellis
Date: Sat Nov 14 23:49:55 2009
New Revision: 836295

URL: http://svn.apache.org/viewvc?rev=836295&view=rev
Log:
nodeprobe loadbalance
patch by jbellis; reviewed by goffinet for CASSANDRA-192

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java 
Sat Nov 14 23:49:55 2009
@@ -119,6 +119,11 @@
             return 
StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
         }
 
+        return getBalancedToken(metadata, load);
+    }
+
+    public static Token getBalancedToken(TokenMetadata metadata, 
Map<InetAddress, Double> load)
+    {
         InetAddress maxEndpoint = getBootstrapSource(metadata, load);
         Token<?> t = getBootstrapTokenFrom(maxEndpoint);
         logger.info("New token will be " + t + " to assume load from " + 
maxEndpoint);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
 Sat Nov 14 23:49:55 2009
@@ -1015,10 +1015,20 @@
 
     public void move(String newToken) throws InterruptedException
     {
+        move(partitioner_.getTokenFactory().fromString(newToken));
+    }
+
+    public void loadBalance() throws IOException, InterruptedException
+    {
+        Token token = BootStrapper.getBalancedToken(tokenMetadata_, 
StorageLoadBalancer.instance().getLoadInfo());
+        move(token);
+    }
+
+    private void move(final Token token) throws InterruptedException
+    {
         if 
(tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
             throw new UnsupportedOperationException("data is currently moving 
to this node; unable to leave the ring");
 
-        final Token token = 
partitioner_.getTokenFactory().fromString(newToken); // make sure it's valid
         logger_.info("moving to " + token);
         Gossiper.instance().addApplicationState(STATE_LEAVING, new 
ApplicationState(getLocalToken().toString()));
         logger_.info("move sleeping " + Streaming.RING_DELAY);
@@ -1051,5 +1061,4 @@
     {
         return replicationStrategy_;
     }
-
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
 Sat Nov 14 23:49:55 2009
@@ -125,6 +125,12 @@
      */
     public void move(String newToken) throws InterruptedException;
 
+    /**
+     * This node will unload its data onto its neighbors, and bootstrap to 
share the range
+     * of the most-loaded node in the ring.
+     */
+    public void loadBalance() throws IOException, InterruptedException;
+
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=836295&r1=836294&r2=836295&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
Sat Nov 14 23:49:55 2009
@@ -24,7 +24,6 @@
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
 import java.lang.management.RuntimeMXBean;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,7 +44,6 @@
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.dht.Range;
-import java.net.InetAddress;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -92,7 +90,7 @@
      * @throws ParseException for missing required, or unrecognized options
      * @throws IOException on connection failures
      */
-    private NodeProbe(String[] cmdArgs) throws ParseException, IOException
+    private NodeProbe(String[] cmdArgs) throws ParseException, IOException, 
InterruptedException
     {
         parseArgs(cmdArgs);
         this.host = cmd.getOptionValue(HOST_OPTION);
@@ -124,7 +122,7 @@
      * @param port TCP port of the remote JMX agent
      * @throws IOException on connection failures
      */
-    public NodeProbe(String host, int port) throws IOException
+    public NodeProbe(String host, int port) throws IOException, 
InterruptedException
     {
         this.host = host;
         this.port = port;
@@ -137,7 +135,7 @@
      * @param host hostname or IP address of the JMX agent
      * @throws IOException on connection failures
      */
-    public NodeProbe(String host) throws IOException
+    public NodeProbe(String host) throws IOException, InterruptedException
     {
         this.host = host;
         this.port = defaultPort;
@@ -385,6 +383,11 @@
         ssProxy.decommission();
     }
 
+    public void loadBalance() throws IOException, InterruptedException
+    {
+        ssProxy.loadBalance();
+    }
+
     public void move(String newToken) throws InterruptedException
     {
         ssProxy.move(newToken);
@@ -485,7 +488,7 @@
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, 
snapshot [name], clearsnapshot, " +
-                "tpstats, flush, decommission, move, " +
+                "tpstats, flush, decommission, move, loadbalance, " +
                 " getcompactionthreshold, setcompactionthreshold 
[minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", 
NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -548,6 +551,10 @@
         {
             probe.decommission();
         }
+        else if (cmdName.equals("loadbalance"))
+        {
+            probe.loadBalance();
+        }
         else if (cmdName.equals("move"))
         {
             if (arguments.length <= 1)


Reply via email to