add auth support to sstableloader; patch by Alexis, reviewed by yukim for CASSANDRA-4712
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732d82b4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732d82b4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732d82b4 Branch: refs/heads/trunk Commit: 732d82b4d4f66f5ae45c7eb24912d612f4505c17 Parents: c710edf Author: Yuki Morishita <[email protected]> Authored: Fri Sep 28 13:23:10 2012 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri Sep 28 13:23:10 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/tools/BulkLoader.java | 37 ++++++++++++-- 2 files changed, 32 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d82b4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f598aa2..1363c22 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ * (CQL3) Fix validation for IN queries for non-PK cols (CASSANDRA-4709) * fix re-created keyspace disappering after 1.1.5 upgrade (CASSANDRA-4698) * (CLI) display elapsed time in 2 fraction digits (CASSANDRA-3460) + * add authentication support to sstableloader (CASSANDRA-4712) Merged from 1.0: * Switch from NBHM to CHM in MessagingService's callback map, which prevents OOM in long-running instances (CASSANDRA-4708) http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d82b4/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index ace37db..88e7443 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -24,6 +24,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -37,7 +38,6 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; public class BulkLoader { @@ -49,6 +49,8 @@ public class BulkLoader private static final String IGNORE_NODES_OPTION = "ignore"; private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes"; private static final String RPC_PORT_OPTION = "port"; + private static final String USER_OPTION = "username"; + private static final String PASSWD_OPTION = "password"; private static final String THROTTLE_MBITS = "throttle"; public static void main(String args[]) throws IOException @@ -57,7 +59,7 @@ public class BulkLoader try { OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug); - SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort), handler); + SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort, options.user, options.passwd), handler); DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle); SSTableLoader.LoaderFuture future = loader.stream(options.ignores); @@ -179,13 +181,17 @@ public class BulkLoader private final OutputHandler outputHandler; private Set<InetAddress> hosts = new HashSet<InetAddress>(); private int rpcPort; + private String user; + private String passwd; - public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port) + public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port, String user, String passwd) { super(); this.outputHandler = outputHandler; this.hosts = hosts; this.rpcPort = port; + this.user = user; + this.passwd = passwd; } public void init(String keyspace) @@ -198,7 +204,7 @@ public class BulkLoader // Query endpoint to ranges map and schemas from thrift InetAddress host = hostiter.next(); - Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); + Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd); List<TokenRange> tokenRanges = client.describe_ring(keyspace); List<KsDef> ksDefs = client.describe_keyspaces(); @@ -237,13 +243,22 @@ public class BulkLoader return cfs != null && cfs.contains(cfName); } - private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException + private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd) throws Exception { TSocket socket = new TSocket(host, port); TTransport trans = new TFramedTransport(socket); trans.open(); TProtocol protocol = new TBinaryProtocol(trans); - return new Cassandra.Client(protocol); + Cassandra.Client client = new Cassandra.Client(protocol); + if (user != null && passwd != null) + { + Map<String, String> credentials = new HashMap<String, String>(); + credentials.put(IAuthenticator.USERNAME_KEY, user); + credentials.put(IAuthenticator.PASSWORD_KEY, passwd); + AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials); + client.login(authenticationRequest); + } + return client; } } @@ -255,6 +270,8 @@ public class BulkLoader public boolean verbose; public boolean noProgress; public int rpcPort = 9160; + public String user; + public String passwd; public int throttle = 0; public Set<InetAddress> hosts = new HashSet<InetAddress>(); @@ -315,6 +332,12 @@ public class BulkLoader if (cmd.hasOption(RPC_PORT_OPTION)) opts.rpcPort = Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION)); + if (cmd.hasOption(USER_OPTION)) + opts.user = cmd.getOptionValue(USER_OPTION); + + if (cmd.hasOption(PASSWD_OPTION)) + opts.passwd = cmd.getOptionValue(PASSWD_OPTION); + if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION)) { String[] nodes = cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(","); @@ -380,6 +403,8 @@ public class BulkLoader options.addOption("d", INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "try to connect to these hosts (comma separated) initially for ring information"); options.addOption("p", RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)"); options.addOption("t", THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)"); + options.addOption("u", USER_OPTION, "username", "username for cassandra authentication"); + options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication"); return options; }
