add client encryption support to sstableloader patch by Sam Tunnicliffe; reviewed by Mikhail Stepura for CASSANDRA-6378
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b2a1903 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b2a1903 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b2a1903 Branch: refs/heads/trunk Commit: 1b2a190379141094a986495bd1386e720786c9b7 Parents: 21bb531 Author: Jonathan Ellis <[email protected]> Authored: Wed Dec 18 16:17:13 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Wed Dec 18 16:17:13 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/tools/BulkLoader.java | 130 ++++++++++++++++++- 2 files changed, 124 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b876204..d6223be 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.4 + * add client encryption support to sstableloader (CASSANDRA-6378) * Fix accept() loop for SSL sockets post-shutdown (CASSANDRA-6468) * Fix size-tiered compaction in LCS L0 (CASSANDRA-6496) * Fix assertion failure in filterColdSSTables (CASSANDRA-6483) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b2a1903/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index c89bb83..15c8df8 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -24,7 +24,9 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import com.google.common.base.Joiner; import com.google.common.collect.Sets; +import org.apache.cassandra.config.EncryptionOptions; import org.apache.commons.cli.*; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -58,12 +60,21 @@ public class BulkLoader private static final String USER_OPTION = "username"; private static final String PASSWD_OPTION = "password"; private static final String THROTTLE_MBITS = "throttle"; + private static final String TRANSPORT_FACTORY = "transport-factory"; + private static final String SSL_TRUSTSTORE = "truststore"; + private static final String SSL_TRUSTSTORE_PW = "truststore-password"; + private static final String SSL_KEYSTORE = "keystore"; + private static final String SSL_KEYSTORE_PW = "keystore-password"; + private static final String SSL_PROTOCOL = "ssl-protocol"; + private static final String SSL_ALGORITHM = "ssl-alg"; + private static final String SSL_STORE_TYPE = "store-type"; + private static final String SSL_CIPHER_SUITES = "ssl-ciphers"; public static void main(String args[]) { LoaderOptions options = LoaderOptions.parseArgs(args); OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd), handler); + SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options.hosts, options.rpcPort, options.user, options.passwd, options.transportFactory), handler); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); StreamResultFuture future = loader.stream(options.ignores); future.addEventListener(new ProgressIndicator()); @@ -175,14 +186,16 @@ public class BulkLoader private final int rpcPort; private final String user; private final String passwd; + private final ITransportFactory transportFactory; - public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd) + public ExternalClient(Set<InetAddress> hosts, int port, String user, String passwd, ITransportFactory transportFactory) { super(); this.hosts = hosts; this.rpcPort = port; this.user = user; this.passwd = passwd; + this.transportFactory = transportFactory; } public void init(String keyspace) @@ -194,7 +207,7 @@ public class BulkLoader { // Query endpoint to ranges map and schemas from thrift InetAddress host = hostiter.next(); - Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd); + Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, this.transportFactory); setPartitioner(client.describe_partitioner()); Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); @@ -233,11 +246,9 @@ public class BulkLoader return knownCfs.get(cfName); } - private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd) throws Exception + private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception { - TSocket socket = new TSocket(host, port); - TTransport trans = new TFramedTransport(socket); - trans.open(); + TTransport trans = transportFactory.openTransport(host, port); TProtocol protocol = new TBinaryProtocol(trans); Cassandra.Client client = new Cassandra.Client(protocol); if (user != null && passwd != null) @@ -263,6 +274,8 @@ public class BulkLoader public String user; public String passwd; public int throttle = 0; + public ITransportFactory transportFactory = new TFramedTransportFactory(); + public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions(); public final Set<InetAddress> hosts = new HashSet<InetAddress>(); public final Set<InetAddress> ignores = new HashSet<InetAddress>(); @@ -367,6 +380,55 @@ public class BulkLoader } } + if(cmd.hasOption(SSL_TRUSTSTORE)) + { + opts.encOptions.truststore = cmd.getOptionValue(SSL_TRUSTSTORE); + } + + if(cmd.hasOption(SSL_TRUSTSTORE_PW)) + { + opts.encOptions.truststore_password = cmd.getOptionValue(SSL_TRUSTSTORE_PW); + } + + if(cmd.hasOption(SSL_KEYSTORE)) + { + opts.encOptions.keystore = cmd.getOptionValue(SSL_KEYSTORE); + // if a keystore was provided, lets assume we'll need to use it + opts.encOptions.require_client_auth = true; + } + + if(cmd.hasOption(SSL_KEYSTORE_PW)) + { + opts.encOptions.keystore_password = cmd.getOptionValue(SSL_KEYSTORE_PW); + } + + if(cmd.hasOption(SSL_PROTOCOL)) + { + opts.encOptions.protocol = cmd.getOptionValue(SSL_PROTOCOL); + } + + if(cmd.hasOption(SSL_ALGORITHM)) + { + opts.encOptions.algorithm = cmd.getOptionValue(SSL_ALGORITHM); + } + + if(cmd.hasOption(SSL_STORE_TYPE)) + { + opts.encOptions.store_type = cmd.getOptionValue(SSL_STORE_TYPE); + } + + if(cmd.hasOption(SSL_CIPHER_SUITES)) + { + opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(","); + } + + if (cmd.hasOption(TRANSPORT_FACTORY)) + { + ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY)); + configureTransportFactory(transportFactory, opts); + opts.transportFactory = transportFactory; + } + return opts; } catch (ParseException e) @@ -376,6 +438,50 @@ public class BulkLoader } } + private static ITransportFactory getTransportFactory(String transportFactory) + { + try + { + Class<?> factory = Class.forName(transportFactory); + if (!ITransportFactory.class.isAssignableFrom(factory)) + throw new IllegalArgumentException(String.format("transport factory '%s' " + + "not derived from ITransportFactory", transportFactory)); + return (ITransportFactory) factory.newInstance(); + } + catch (Exception e) + { + throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e); + } + } + + private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts) + { + Map<String, String> options = new HashMap<>(); + // If the supplied factory supports the same set of options as our SSL impl, set those + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE)) + options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD)) + options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL)) + options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES)) + options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites)); + + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE) + && opts.encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore); + if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD) + && opts.encOptions.require_client_auth) + options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password); + + // Now check if any of the factory's supported options are set as system properties + for (String optionKey : transportFactory.supportedOptions()) + if (System.getProperty(optionKey) != null) + options.put(optionKey, System.getProperty(optionKey)); + + transportFactory.setOptions(options); + } + private static void errorMsg(String msg, CmdLineOptions options) { System.err.println(msg); @@ -395,6 +501,16 @@ public class BulkLoader options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); + options.addOption("tf", TRANSPORT_FACTORY, "transport factory", "Fully-qualified ITransportFactory class name for creating a connection to cassandra"); + // ssl connection-related options + options.addOption("ts", SSL_TRUSTSTORE, "TRUSTSTORE", "SSL: full path to truststore"); + options.addOption("tspw", SSL_TRUSTSTORE_PW, "TRUSTSTORE-PASSWORD", "SSL: password of the truststore"); + options.addOption("ks", SSL_KEYSTORE, "KEYSTORE", "SSL: full path to keystore"); + options.addOption("kspw", SSL_KEYSTORE_PW, "KEYSTORE-PASSWORD", "SSL: password of the keystore"); + options.addOption("prtcl", SSL_PROTOCOL, "PROTOCOL", "SSL: connections protocol to use (default: TLS)"); + options.addOption("alg", SSL_ALGORITHM, "ALGORITHM", "SSL: algorithm (default: SunX509)"); + options.addOption("st", SSL_STORE_TYPE, "STORE-TYPE", "SSL: type of store"); + options.addOption("ciphers", SSL_CIPHER_SUITES, "CIPHER-SUITES", "SSL: comma-separated list of encryption suites to use"); return options; }
