Repository: cassandra Updated Branches: refs/heads/trunk fc2e420fd -> c22ee2bd4
Add sstableloader option to accept target keyspace name patch by Jaydeepkumar Chovatia; reviewed by Jay Zhuang for CASSANDRA-13884 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c22ee2bd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c22ee2bd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c22ee2bd Branch: refs/heads/trunk Commit: c22ee2bd451d030e99cfb65be839bbc735a5352f Parents: fc2e420 Author: jaydeepkumar1984 <[email protected]> Authored: Mon Sep 18 17:07:56 2017 -0700 Committer: Jay Zhuang <[email protected]> Committed: Thu Mar 29 10:41:27 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/io/sstable/SSTableLoader.java | 6 +-- .../org/apache/cassandra/tools/BulkLoader.java | 3 +- .../apache/cassandra/tools/LoaderOptions.java | 14 ++++++ .../cassandra/io/sstable/SSTableLoaderTest.java | 48 ++++++++++++++++++++ 5 files changed, 68 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c0551b7..04705ba 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884) * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713) * Add coordinator write metric per CF (CASSANDRA-14232) * Fix scheduling of speculative retry threshold recalculation (CASSANDRA-14338) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 980fdf1..a6985f7 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -56,13 +56,13 @@ public class SSTableLoader implements StreamEventHandler public SSTableLoader(File directory, Client client, OutputHandler outputHandler) { - this(directory, client, outputHandler, 1); + this(directory, client, outputHandler, 1, null); } - public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost) + public SSTableLoader(File directory, Client client, OutputHandler outputHandler, int connectionsPerHost, String targetKeyspace) { this.directory = directory; - this.keyspace = directory.getParentFile().getName(); + this.keyspace = targetKeyspace != null ? targetKeyspace : directory.getParentFile().getName(); this.client = client; this.outputHandler = outputHandler; this.connectionsPerHost = connectionsPerHost; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 545d1f7..d85c605 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -65,7 +65,8 @@ public class BulkLoader buildSSLOptions(options.clientEncOptions), options.allowServerPortDiscovery), handler, - options.connectionsPerHost); + options.connectionsPerHost, + options.targetKeyspace); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); DatabaseDescriptor.setInterDCStreamThroughputOutboundMegabitsPerSec(options.interDcThrottle); StreamResultFuture future = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/src/java/org/apache/cassandra/tools/LoaderOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/LoaderOptions.java b/src/java/org/apache/cassandra/tools/LoaderOptions.java index 3686584..d6cb670 100644 --- a/src/java/org/apache/cassandra/tools/LoaderOptions.java +++ b/src/java/org/apache/cassandra/tools/LoaderOptions.java @@ -38,6 +38,7 @@ import org.apache.cassandra.tools.BulkLoader.CmdLineOptions; import com.datastax.driver.core.AuthProvider; import com.datastax.driver.core.PlainTextAuthProvider; import org.apache.commons.cli.*; +import org.apache.commons.lang3.StringUtils; public class LoaderOptions { @@ -59,6 +60,7 @@ public class LoaderOptions public static final String INTER_DC_THROTTLE_MBITS = "inter-dc-throttle"; public static final String TOOL_NAME = "sstableloader"; public static final String ALLOW_SERVER_PORT_DISCOVERY_OPTION = "server-port-discovery"; + public static final String TARGET_KEYSPACE = "target-keyspace"; /* client encryption options */ public static final String SSL_TRUSTSTORE = "truststore"; @@ -88,6 +90,7 @@ public class LoaderOptions public final Set<InetSocketAddress> hosts; public final Set<InetAddressAndPort> ignores; public final boolean allowServerPortDiscovery; + public final String targetKeyspace; LoaderOptions(Builder builder) { @@ -109,6 +112,7 @@ public class LoaderOptions hosts = builder.hosts; allowServerPortDiscovery = builder.allowServerPortDiscovery; ignores = builder.ignores; + targetKeyspace = builder.targetKeyspace; } static class Builder @@ -134,6 +138,7 @@ public class LoaderOptions Set<InetSocketAddress> hosts = new HashSet<>(); Set<InetAddressAndPort> ignores = new HashSet<>(); boolean allowServerPortDiscovery; + String targetKeyspace; Builder() { @@ -509,6 +514,14 @@ public class LoaderOptions clientEncOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); } + if (cmd.hasOption(TARGET_KEYSPACE)) + { + targetKeyspace = cmd.getOptionValue(TARGET_KEYSPACE); + if (StringUtils.isBlank(targetKeyspace)) + { + errorMsg("Empty keyspace is not supported.", options); + } + } return this; } catch (ParseException | ConfigurationException | MalformedURLException e) @@ -615,6 +628,7 @@ public class LoaderOptions options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "Client SSL: comma-separated list of encryption suites to use"); options.addOption("f", CONFIG_PATH, "path to config file", "cassandra.yaml file path for streaming throughput and client/server SSL."); options.addOption("spd", ALLOW_SERVER_PORT_DISCOVERY_OPTION, "allow server port discovery", "Use ports published by server to decide how to connect. With SSL requires StartTLS to be used."); + options.addOption("k", TARGET_KEYSPACE, "target keyspace name", "target keyspace name"); return options; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c22ee2bd/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index 430b7c2..8509115 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -56,6 +56,7 @@ import static org.junit.Assert.assertTrue; public class SSTableLoaderTest { public static final String KEYSPACE1 = "SSTableLoaderTest"; + public static final String KEYSPACE2 = "SSTableLoaderTest1"; public static final String CF_STANDARD1 = "Standard1"; public static final String CF_STANDARD2 = "Standard2"; @@ -70,6 +71,11 @@ public class SSTableLoaderTest SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2)); + SchemaLoader.createKeyspace(KEYSPACE2, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1), + SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD2)); + StorageService.instance.initServer(); } @@ -208,6 +214,48 @@ public class SSTableLoaderTest latch.await(); } + @Test + public void testLoadingSSTableToDifferentKeyspace() throws Exception + { + File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator + CF_STANDARD1); + assert dataDir.mkdirs(); + TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE1, CF_STANDARD1); + + String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))"; + String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)"; + + try (CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(String.format(schema, KEYSPACE1, CF_STANDARD1)) + .using(String.format(query, KEYSPACE1, CF_STANDARD1)) + .build()) + { + writer.addRow("key1", "col1", "100"); + } + + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them + + final CountDownLatch latch = new CountDownLatch(1); + SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false), 1, KEYSPACE2); + loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); + + cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1); + cfs.forceBlockingFlush(); + + List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build()); + + assertEquals(1, partitions.size()); + assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); + assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1"))) + .getCell(metadata.getColumn(ByteBufferUtil.bytes("val"))) + .value()); + + // The stream future is signalled when the work is complete but before releasing references. Wait for release + // before cleanup (CASSANDRA-10118). + latch.await(); + } + StreamEventHandler completionStreamListener(final CountDownLatch latch) { return new StreamEventHandler() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
