Updated Branches: refs/heads/trunk a801aae13 -> 14afcc7d7
shuffle utility for virtual nodes Patch by eevans; reviewed by Brandon Williams for CASSSANDRA-4443 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14afcc7d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14afcc7d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14afcc7d Branch: refs/heads/trunk Commit: 14afcc7d76f9b60228f4c76b696294835027c1a4 Parents: a801aae Author: Eric Evans <[email protected]> Authored: Thu Oct 25 11:13:27 2012 -0500 Committer: Eric Evans <[email protected]> Committed: Thu Oct 25 11:13:27 2012 -0500 ---------------------------------------------------------------------- bin/shuffle | 57 ++ .../ScheduledRangeTransferExecutorService.java | 6 +- .../apache/cassandra/tools/AbstractJmxClient.java | 150 +++ src/java/org/apache/cassandra/tools/Shuffle.java | 741 +++++++++++++++ 4 files changed, 951 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/14afcc7d/bin/shuffle ---------------------------------------------------------------------- diff --git a/bin/shuffle b/bin/shuffle new file mode 100644 index 0000000..d6dabfe --- /dev/null +++ b/bin/shuffle @@ -0,0 +1,57 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + for include in /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh \ + `dirname $0`/cassandra.in.sh; do + if [ -r $include ]; then + . $include + break + fi + done +elif [ -r $CASSANDRA_INCLUDE ]; then + . $CASSANDRA_INCLUDE +fi + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -x $JAVA_HOME/bin/java ]; then + JAVA=$JAVA_HOME/bin/java +else + JAVA=`which java` +fi + +if [ -z $CASSANDRA_CONF -o -z $CLASSPATH ]; then + echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2 + exit 1 +fi + +# Special-case path variables. +case "`uname`" in + CYGWIN*) + CLASSPATH=`cygpath -p -w "$CLASSPATH"` + CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"` + ;; +esac + +$JAVA -cp $CLASSPATH \ + -Xmx32m \ + -Dlog4j.configuration=log4j-tools.properties \ + org.apache.cassandra.tools.Shuffle $@ + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/14afcc7d/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java index 88a1126..ca8ea02 100644 --- a/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java +++ b/src/java/org/apache/cassandra/service/ScheduledRangeTransferExecutorService.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.service; +import static org.apache.cassandra.cql3.QueryProcessor.processInternal; + import java.nio.ByteBuffer; import java.util.Collections; import java.util.Date; @@ -34,8 +36,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.cassandra.cql3.QueryProcessor.processInternal; - public class ScheduledRangeTransferExecutorService { private static final Logger LOG = LoggerFactory.getLogger(ScheduledRangeTransferExecutorService.class); @@ -74,7 +74,7 @@ class RangeTransfer implements Runnable public void run() { - UntypedResultSet res = processInternal("SELECT * FROM system." + SystemTable.RANGE_XFERS_CF + " LIMIT 1"); + UntypedResultSet res = processInternal("SELECT * FROM system." + SystemTable.RANGE_XFERS_CF); if (res.size() < 1) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/14afcc7d/src/java/org/apache/cassandra/tools/AbstractJmxClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/AbstractJmxClient.java b/src/java/org/apache/cassandra/tools/AbstractJmxClient.java new file mode 100644 index 0000000..65b313b --- /dev/null +++ b/src/java/org/apache/cassandra/tools/AbstractJmxClient.java @@ -0,0 +1,150 @@ +package org.apache.cassandra.tools; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; +import java.util.Map; + +import javax.management.MBeanServerConnection; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; + +public abstract class AbstractJmxClient implements Closeable +{ + private static final Options options = new Options(); + protected static final int DEFAULT_JMX_PORT = 7199; + protected static final String DEFAULT_HOST = "localhost"; + + protected final String host; + protected final int port; + protected JMXConnection jmxConn; + protected PrintStream out = System.out; + + static + { + options.addOption("h", "host", true, "JMX hostname or IP address (Default: localhost)"); + options.addOption("p", "port", true, "JMX port number (Default: 7199)"); + options.addOption("H", "help", false, "Print help information"); + } + + public AbstractJmxClient(String host, Integer port, String username, String password) throws IOException + { + this.host = (host != null) ? host : DEFAULT_HOST; + this.port = (port != null) ? port : DEFAULT_JMX_PORT; + jmxConn = new JMXConnection(this.host, this.port, username, password); + } + + public void close() throws IOException + { + jmxConn.close(); + } + + public void writeln(Throwable err) + { + writeln(err.getMessage()); + } + + public void writeln(String msg) + { + out.println(msg); + } + + public void write(String msg) + { + out.print(msg); + } + + public void writeln(String format, Object...args) + { + write(format + "%n", args); + } + + public void write(String format, Object...args) + { + out.printf(format, args); + } + + public void setOutput(PrintStream out) + { + this.out = out; + } + + public static CommandLine processArguments(String[] args) throws ParseException + { + CommandLineParser parser = new PosixParser(); + return parser.parse(options, args); + } + + public static void addCmdOption(String shortOpt, String longOpt, boolean hasArg, String description) + { + options.addOption(shortOpt, longOpt, hasArg, description); + } + + public static void printHelp(String synopsis, String header) + { + System.out.printf("Usage: %s%n%n", synopsis); + System.out.print(header); + System.out.println("Options:"); + for (Object opt : options.getOptions()) + { + String shortOpt = String.format("%s,", ((Option)opt).getOpt()); + String longOpt = ((Option)opt).getLongOpt(); + String description = ((Option)opt).getDescription(); + System.out.printf(" -%-4s --%-17s %s%n", shortOpt, longOpt, description); + } + } +} + +class JMXConnection +{ + private static final String FMT_URL = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi"; + private final String host, username, password; + private final int port; + private JMXConnector jmxc; + private MBeanServerConnection mbeanServerConn; + + JMXConnection(String host, int port) throws IOException + { + this(host, port, null, null); + } + + JMXConnection(String host, int port, String username, String password) throws IOException + { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + connect(); + } + + private void connect() throws IOException + { + JMXServiceURL jmxUrl = new JMXServiceURL(String.format(FMT_URL, host, port)); + Map<String, Object> env = new HashMap<String, Object>(); + + if (username != null) + env.put(JMXConnector.CREDENTIALS, new String[]{ username, password }); + + jmxc = JMXConnectorFactory.connect(jmxUrl, env); + mbeanServerConn = jmxc.getMBeanServerConnection(); + } + + public void close() throws IOException + { + jmxc.close(); + } + + public MBeanServerConnection getMbeanServerConn() + { + return mbeanServerConn; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/14afcc7d/src/java/org/apache/cassandra/tools/Shuffle.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java new file mode 100644 index 0000000..a091ab8 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/Shuffle.java @@ -0,0 +1,741 @@ +package org.apache.cassandra.tools; + +import java.io.Closeable; +import java.io.IOException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import javax.management.JMX; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.cassandra.cql.jdbc.JdbcDate; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointSnitchInfoMBean; +import org.apache.cassandra.service.StorageServiceMBean; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.CqlRow; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.TokenRange; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.MissingArgumentException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TFastFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +public class Shuffle extends AbstractJmxClient +{ + private static final String ssObjName = "org.apache.cassandra.db:type=StorageService"; + private static final String epSnitchObjName = "org.apache.cassandra.db:type=EndpointSnitchInfo"; + + private StorageServiceMBean ssProxy = null; + private Random rand = new Random(System.currentTimeMillis()); + private final String thriftHost; + private final int thriftPort; + private final boolean thriftFramed; + + static + { + addCmdOption("th", "thrift-host", true, "Thrift hostname or IP address (Default: JMX host)"); + addCmdOption("tp", "thrift-port", true, "Thrift port number (Default: 9160)"); + addCmdOption("tf", "thrift-framed", false, "Enable framed transport for Thrift (Default: false)"); + addCmdOption("en", "and-enable", true, "Immediately enable shuffling (create only)"); + addCmdOption("dc", "only-dc", true, "Apply only to named DC (create only)"); + } + + public Shuffle(String host, int port) throws IOException + { + this(host, port, host, 9160, false, null, null); + } + + public Shuffle(String host, int port, String thriftHost, int thriftPort, boolean thriftFramed, String username, String password) + throws IOException + { + super(host, port, username, password); + + this.thriftHost = thriftHost; + this.thriftPort = thriftPort; + this.thriftFramed = thriftFramed; + + // Setup the StorageService proxy. + ssProxy = getSSProxy(jmxConn.getMbeanServerConn()); + } + + public StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn) + { + StorageServiceMBean proxy = null; + try + { + ObjectName name = new ObjectName(ssObjName); + proxy = JMX.newMBeanProxy(mbeanConn, name, StorageServiceMBean.class); + } + catch (MalformedObjectNameException e) + { + throw new RuntimeException(e); + } + return proxy; + } + + public EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn) + { + EndpointSnitchInfoMBean proxy = null; + try + { + ObjectName name = new ObjectName(epSnitchObjName); + proxy = JMX.newMBeanProxy(mbeanConn, name, EndpointSnitchInfoMBean.class); + } + catch (MalformedObjectNameException e) + { + throw new RuntimeException(e); + } + return proxy; + } + + /** + * Given a Multimap of endpoint to tokens, return a new randomized mapping. + * + * @param endpointMap current mapping of endpoint to tokens + * @return a new mapping of endpoint to tokens + */ + public Multimap<String, String> calculateRelocations(Multimap<String, String> endpointMap) + { + Multimap<String, String> relocations = HashMultimap.create(); + Set<String> endpoints = new HashSet<String>(endpointMap.keySet()); + Map<String, Integer> endpointToNumTokens = new HashMap<String, Integer>(endpoints.size()); + Map<String, Iterator<String>> iterMap = new HashMap<String, Iterator<String>>(endpoints.size()); + + // Create maps of endpoint to token iterators, and endpoint to number of tokens. + for (String endpoint : endpoints) + { + try + { + endpointToNumTokens.put(endpoint, ssProxy.getTokens(endpoint).size()); + } + catch (UnknownHostException e) + { + throw new RuntimeException("What that...?", e); + } + + iterMap.put(endpoint, endpointMap.get(endpoint).iterator()); + } + + int epsToComplete = endpoints.size(); + Set<String> endpointsCompleted = new HashSet<String>(); + + outer: + while (true) + { + endpoints.removeAll(endpointsCompleted); + + for (String endpoint : endpoints) + { + boolean choiceMade = false; + + if (!iterMap.get(endpoint).hasNext()) + { + endpointsCompleted.add(endpoint); + continue; + } + + String token = iterMap.get(endpoint).next(); + + List<String> subSet = new ArrayList<String>(endpoints); + subSet.remove(endpoint); + Collections.shuffle(subSet, rand); + + for (String choice : subSet) + { + if (relocations.get(choice).size() < endpointToNumTokens.get(choice)) + { + relocations.put(choice, token); + choiceMade = true; + break; + } + } + + if (!choiceMade) + relocations.put(endpoint, token); + } + + // We're done when we've exhausted all of the token iterators + if (endpointsCompleted.size() == epsToComplete) + break outer; + } + + return relocations; + } + + /** + * Enable relocations. + * + * @param endpoints sequence of hostname or IP strings + */ + public void enableRelocations(String...endpoints) + { + enableRelocations(endpoints); + } + + /** + * Enable relocations. + * + * @param endpoints Collection of hostname or IP strings + */ + public void enableRelocations(Collection<String> endpoints) + { + for (String endpoint : endpoints) + { + try + { + JMXConnection conn = new JMXConnection(endpoint, port); + getSSProxy(conn.getMbeanServerConn()).enableScheduledRangeXfers(); + conn.close(); + } + catch (IOException e) + { + writeln("Failed to enable shuffling on %s!", endpoint); + } + } + } + + /** + * Disable relocations. + * + * @param endpoints sequence of hostname or IP strings + */ + public void disableRelocations(String...endpoints) + { + disableRelocations(endpoints); + } + + /** + * Disable relocations. + * + * @param endpoints Collection of hostname or IP strings + */ + public void disableRelocations(Collection<String> endpoints) + { + for (String endpoint : endpoints) + { + try + { + JMXConnection conn = new JMXConnection(endpoint, port); + getSSProxy(conn.getMbeanServerConn()).disableScheduledRangeXfers(); + conn.close(); + } + catch (IOException e) + { + writeln("Failed to enable shuffling on %s!", endpoint); + } + } + } + + /** + * Return a list of the live nodes (using JMX). + * + * @return String endpoint names + * @throws ShuffleError + */ + public Collection<String> getLiveNodes() throws ShuffleError + { + try + { + JMXConnection conn = new JMXConnection(host, port); + return getSSProxy(conn.getMbeanServerConn()).getLiveNodes(); + } + catch (IOException e) + { + throw new ShuffleError("Error retrieving list of nodes from JMX interface"); + } + } + + /** + * Create and distribute a new, randomized token to endpoint mapping. + * + * @throws ShuffleError on handled exceptions + */ + public void shuffle(boolean enable, String onlyDc) throws ShuffleError + { + CassandraClient seedClient = null; + Map<String, String> tokenMap = null; + IPartitioner<?> partitioner = null; + Multimap<String, String> endpointMap = HashMultimap.create(); + EndpointSnitchInfoMBean epSnitchProxy = getEpSnitchProxy(jmxConn.getMbeanServerConn()); + + try + { + seedClient = getThriftClient(thriftHost, thriftPort, thriftFramed); + tokenMap = seedClient.describe_token_map(); + + for (Map.Entry<String, String> entry : tokenMap.entrySet()) + { + String endpoint = entry.getValue(), token = entry.getKey(); + try + { + if (onlyDc != null) + { + if (onlyDc.equals(epSnitchProxy.getDatacenter(endpoint))) + endpointMap.put(endpoint, token); + } + else + endpointMap.put(endpoint, token); + } + catch (UnknownHostException e) + { + writeln("Warning: %s unknown to EndpointSnitch!", endpoint); + } + } + } + catch (InvalidRequestException ire) + { + throw new RuntimeException("What that...?", ire); + } + catch (TException e) + { + throw new ShuffleError( + String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage())); + } + + partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed); + + Multimap<String, String> relocations = calculateRelocations(endpointMap); + + writeln("%-42s %-15s %-15s", "Token", "From", "To"); + writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~"); + + // Store relocations on remote nodes. + for (String endpoint : relocations.keySet()) + { + for (String tok : relocations.get(endpoint)) + writeln("%-42s %-15s %-15s", tok, tokenMap.get(tok), endpoint); + + String cqlQuery = createShuffleBatchInsert(relocations.get(endpoint), partitioner); + executeCqlQuery(endpoint, thriftPort, thriftFramed, cqlQuery); + } + + if (enable) + enableRelocations(relocations.keySet()); + } + + /** + * Print a list of pending token relocations for all nodes. + * + * @throws ShuffleError + */ + public void ls() throws ShuffleError + { + Map<String, List<CqlRow>> queuedRelocations = listRelocations(); + IPartitioner<?> partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed); + boolean justOnce = false; + + for (String host : queuedRelocations.keySet()) + { + for (CqlRow row : queuedRelocations.get(host)) + { + assert row.getColumns().size() == 2; + + if (!justOnce) + { + writeln("%-42s %-15s %s", "Token", "Endpoint", "Requested at"); + writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); + justOnce = true; + } + + ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue()); + ByteBuffer requestedAt = ByteBuffer.wrap(row.getColumns().get(1).getValue()); + Date time = JdbcDate.instance.compose(requestedAt); + Token<?> token = partitioner.getTokenFactory().fromByteArray(tokenBytes); + + writeln("%-42s %-15s %s", token.toString(), host, time.toString()); + } + } + } + + /** + * List pending token relocations for all nodes. + * + * @return + * @throws ShuffleError + */ + private Map<String, List<CqlRow>> listRelocations() throws ShuffleError + { + String cqlQuery = "SELECT token_bytes,requested_at FROM system.range_xfers"; + Map<String, List<CqlRow>> results = new HashMap<String, List<CqlRow>>(); + + for (String host : getLiveNodes()) + { + CqlResult result = executeCqlQuery(host, thriftPort, thriftFramed, cqlQuery); + results.put(host, result.getRows()); + } + + return results; + } + + /** + * Clear pending token relocations on all nodes. + * + * @throws ShuffleError + */ + public void clear() throws ShuffleError + { + Map<String, List<CqlRow>> queuedRelocations = listRelocations(); + + for (String host : queuedRelocations.keySet()) + { + + for (CqlRow row : queuedRelocations.get(host)) + { + assert row.getColumns().size() == 2; + + ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue()); + String query = String.format("DELETE FROM system.range_xfers WHERE token_bytes = '%s'", + ByteBufferUtil.bytesToHex(tokenBytes)); + executeCqlQuery(host, thriftPort, thriftFramed, query); + } + } + } + + /** + * Enable shuffling on all nodes in the cluster. + * + * @throws ShuffleError + */ + public void enable() throws ShuffleError + { + enableRelocations(getLiveNodes()); + } + + /** + * Disable shuffling on all nodes in the cluster. + * + * @throws ShuffleError + */ + public void disable() throws ShuffleError + { + disableRelocations(getLiveNodes()); + } + + /** + * Setup and return a new Thrift RPC connection. + * + * @param hostName hostname or address to connect to + * @param port port number to connect to + * @param framed wrap with framed transport if true + * @return a CassandraClient instance + * @throws ShuffleError + */ + public static CassandraClient getThriftClient(String hostName, int port, boolean framed) throws ShuffleError + { + try + { + return new CassandraClient(hostName, port, framed); + } + catch (TTransportException e) + { + throw new ShuffleError(String.format("Unable to connect to %s/%d: %s", hostName, port, e.getMessage())); + } + } + + /** + * Execute a CQL v3 query. + * + * @param hostName hostname or address to connect to + * @param port port number to connect to + * @param isFramed wrap with framed transport if true + * @param cqlQuery CQL query string + * @return a Thrift CqlResult instance + * @throws ShuffleError + */ + public static CqlResult executeCqlQuery(String hostName, int port, boolean isFramed, String cqlQuery) throws ShuffleError + { + CassandraClient client = null; + + try + { + client = getThriftClient(hostName, port, isFramed); + return client.execute_cql_query(ByteBuffer.wrap(cqlQuery.getBytes()), Compression.NONE); + } + catch (UnavailableException e) + { + throw new ShuffleError( + String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName)); + } + catch (TimedOutException e) + { + throw new ShuffleError( + String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + client.close(); + } + } + + /** + * Return a partitioner instance for remote host. + * + * @param hostName hostname or address to connect to + * @param port port number to connect to + * @param framed wrap with framed transport if true + * @return an IPartitioner instance + * @throws ShuffleError + */ + public static IPartitioner<?> getPartitioner(String hostName, int port, boolean framed) throws ShuffleError + { + String partitionerName = null; + try + { + partitionerName = getThriftClient(hostName, port, framed).describe_partitioner(); + } + catch (TException e) + { + throw new ShuffleError( + String.format("Thrift request to %s:%d failed: %s", hostName, port, e.getMessage())); + } + catch (InvalidRequestException e) + { + throw new RuntimeException("Error calling describe_partitioner() defies explanation", e); + } + + try + { + Class<?> partitionerClass = Class.forName(partitionerName); + return (IPartitioner<?>)partitionerClass.newInstance(); + } + catch (ClassNotFoundException e) + { + throw new ShuffleError("Unable to locate class for partitioner: " + partitionerName); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + /** + * Create and return a CQL batch insert statement for a set of token relocations. + * + * @param tokens tokens to be relocated + * @param partitioner an instance of the IPartitioner in use + * @return a query string + */ + public static String createShuffleBatchInsert(Collection<String> tokens, IPartitioner<?> partitioner) + { + StringBuilder query = new StringBuilder(); + query.append("BEGIN BATCH").append("\n"); + + for (String tokenStr : tokens) + { + Token<?> token = partitioner.getTokenFactory().fromString(tokenStr); + String hexToken = ByteBufferUtil.bytesToHex(partitioner.getTokenFactory().toByteArray(token)); + query.append("INSERT INTO system.range_xfers (token_bytes, requested_at) ") + .append("VALUES ('").append(hexToken).append("', 'now');").append("\n"); + } + + query.append("APPLY BATCH").append("\n"); + return query.toString(); + } + + /** Print usage information. */ + private static void printShuffleHelp() + { + StringBuilder sb = new StringBuilder(); + sb.append("Sub-commands:").append(String.format("%n")); + sb.append(" create Initialize a new shuffle operation").append(String.format("%n")); + sb.append(" ls List pending relocations").append(String.format("%n")); + sb.append(" clear Clear pending relocations").append(String.format("%n")); + sb.append(" en[able] Enable shuffling").append(String.format("%n")); + sb.append(" dis[able] Disable shuffling").append(String.format("%n%n")); + + printHelp("shuffle [options] <sub-command>", sb.toString()); + } + + /** + * Execute. + * + * @param args arguments passed on the command line + * @throws Exception when face meets palm + */ + public static void main(String[] args) throws Exception + { + CommandLine cmd = null; + try + { + cmd = processArguments(args); + } + catch (MissingArgumentException e) + { + System.err.println(e.getMessage()); + System.exit(1); + } + + // Sub command argument. + if (cmd.getArgList().size() < 1) + { + System.err.println("Missing sub-command argument."); + printShuffleHelp(); + System.exit(1); + } + String subCommand = (String)(cmd.getArgList()).get(0); + + String hostName = (cmd.getOptionValue("host") != null) ? cmd.getOptionValue("host") : DEFAULT_HOST; + String port = (cmd.getOptionValue("port") != null) ? cmd.getOptionValue("port") : Integer.toString(DEFAULT_JMX_PORT); + String thriftHost = (cmd.getOptionValue("thrift-host") != null) ? cmd.getOptionValue("thrift-host") : hostName; + String thriftPort = (cmd.getOptionValue("thrift-port") != null) ? cmd.getOptionValue("thrift-port") : "9160"; + String onlyDc = cmd.getOptionValue("only-dc"); + boolean thriftFramed = cmd.hasOption("thrift-framed") ? true : false; + boolean andEnable = cmd.hasOption("and-enable") ? true : false; + int portNum = -1, thriftPortNum = -1; + + // Parse JMX port number + if (port != null) + { + try + { + portNum = Integer.parseInt(port); + } + catch (NumberFormatException ferr) + { + System.err.printf("%s is not a valid JMX port number.%n", port); + System.exit(1); + } + } + else + portNum = DEFAULT_JMX_PORT; + + // Parse Thrift port number + if (thriftPort != null) + { + try + { + thriftPortNum = Integer.parseInt(thriftPort); + } + catch (NumberFormatException ferr) + { + System.err.printf("%s is not a valid port number.%n", thriftPort); + System.exit(1); + } + } + else + thriftPortNum = 9160; + + Shuffle shuffler = new Shuffle(hostName, portNum, thriftHost, thriftPortNum, thriftFramed, null, null); + + try + { + if (subCommand.equals("create")) + shuffler.shuffle(andEnable, onlyDc); + else if (subCommand.equals("ls")) + shuffler.ls(); + else if (subCommand.startsWith("en")) + shuffler.enable(); + else if (subCommand.startsWith("dis")) + shuffler.disable(); + else if (subCommand.equals("clear")) + shuffler.clear(); + else + { + System.err.println("Unknown subcommand: " + subCommand); + printShuffleHelp(); + System.exit(1); + } + } + catch (ShuffleError err) + { + shuffler.writeln(err); + System.exit(1); + } + finally + { + shuffler.close(); + } + + System.exit(0); + } +} + +/** A self-contained Cassandra.Client; Closeable. */ +class CassandraClient implements Closeable +{ + TTransport transport; + Cassandra.Client client; + + CassandraClient(String hostName, int port, boolean framed) throws TTransportException + { + TSocket socket = new TSocket(hostName, port); + transport = (framed) ? socket : new TFastFramedTransport(socket); + transport.open(); + client = new Cassandra.Client(new TBinaryProtocol(transport)); + + try + { + client.set_cql_version("3.0.0"); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception + { + return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE); + } + + String describe_partitioner() throws TException, InvalidRequestException + { + return client.describe_partitioner(); + } + + List<TokenRange> describe_ring(String keyspace) throws InvalidRequestException, TException + { + return client.describe_ring(keyspace); + } + + Map<String, String> describe_token_map() throws InvalidRequestException, TException + { + return client.describe_token_map(); + } + + public void close() + { + transport.close(); + } +} + +@SuppressWarnings("serial") +class ShuffleError extends Exception +{ + ShuffleError(String msg) + { + super(msg); + } +}
