Updated Branches: refs/heads/trunk c9cf05098 -> 2046956b6
Fix BulkLoader to support new SSTable layout and add stream throttling to prevent an NPE when there is no yaml config patch by Brandon Williams; reviewed by Pavel Yaskevich for CASSANDRA-3752 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2046956b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2046956b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2046956b Branch: refs/heads/trunk Commit: 2046956b6cfd809702bf33cb85a9cae091ce1d72 Parents: c9cf050 Author: Pavel Yaskevich <[email protected]> Authored: Sun Jan 22 20:18:37 2012 +0200 Committer: Pavel Yaskevich <[email protected]> Committed: Sun Jan 22 20:20:30 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../org/apache/cassandra/tools/BulkLoader.java | 15 +++++++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046956b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5701a97..4a8ee42 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -46,6 +46,8 @@ * Allows (internally) doing a range query with a limit of columns instead of rows (CASSANDRA-3742) * Allow rangeSlice queries to be start/end inclusive/exclusive (CASSANDRA-3749) + * Fix BulkLoader to support new SSTable layout and add stream + throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752) 1.0.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046956b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 2d52ccb..131deb2 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -50,7 +50,7 @@ public class SSTableLoader public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { this.directory = directory; - this.keyspace = directory.getName(); + this.keyspace = directory.getParentFile().getName(); this.client = client; this.outputHandler = outputHandler; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046956b/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 292a7a1..b1a0560 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -52,6 +52,7 @@ public class BulkLoader private static final String IGNORE_NODES_OPTION = "ignore"; private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; private static final String RPC_PORT_OPTION = "port"; + private static final String THROTTLE_MBITS = "throttle"; public static void main(String args[]) throws IOException { @@ -59,6 +60,7 @@ public class BulkLoader try { SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options); + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); SSTableLoader.LoaderFuture future = loader.stream(options.ignores); if (options.noProgress) @@ -249,6 +251,7 @@ public class BulkLoader public boolean verbose; public boolean noProgress; public int rpcPort = 9160; + public int throttle = 0; public Set<InetAddress> hosts = new HashSet<InetAddress>(); public Set<InetAddress> ignores = new HashSet<InetAddress>(); @@ -302,6 +305,9 @@ public class BulkLoader opts.verbose = cmd.hasOption(VERBOSE_OPTION); opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION); + if (cmd.hasOption(THROTTLE_MBITS)) + opts.throttle = Integer.valueOf(cmd.getOptionValue(THROTTLE_MBITS)); + if (cmd.hasOption(RPC_PORT_OPTION)) opts.rpcPort = Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION)); @@ -381,6 +387,7 @@ public class BulkLoader options.addOption("i", IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes"); options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information"); options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)"); + options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); return options; } @@ -389,11 +396,11 @@ public class BulkLoader String usage = String.format("%s [options] <dir_path>", TOOL_NAME); StringBuilder header = new StringBuilder(); header.append("--\n"); - header.append("Bulk load the sstables find in the directory <dir_path> to the configured cluster." ); - header.append("The last directory of <dir_path> is used as the keyspace name. "); - header.append("So for instance, to load a sstable named Standard1-g-1-Data.db into keyspace Keyspace1, "); + header.append("Bulk load the sstables found in the directory <dir_path> to the configured cluster." ); + header.append("The parent directory of <dir_path> is used as the keyspace name. "); + header.append("So for instance, to load an sstable named Standard1-g-1-Data.db into keyspace Keyspace1, "); header.append("you will need to have the files Standard1-g-1-Data.db and Standard1-g-1-Index.db in a "); - header.append("directory Keyspace1/ in the current directory and call: sstableloader Keyspace1"); + header.append("directory Keyspace1/Standard1/ in the directory and call: sstableloader Keyspace1/Standard1"); header.append("\n--\n"); header.append("Options are:"); new HelpFormatter().printHelp(usage, header.toString(), options, "");
