add auth support to sstableloader; patch by Alexis, reviewed by yukim for 
CASSANDRA-4712


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732d82b4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732d82b4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732d82b4

Branch: refs/heads/cassandra-1.1
Commit: 732d82b4d4f66f5ae45c7eb24912d612f4505c17
Parents: c710edf
Author: Yuki Morishita <[email protected]>
Authored: Fri Sep 28 13:23:10 2012 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Fri Sep 28 13:23:10 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/tools/BulkLoader.java     |   37 ++++++++++++--
 2 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d82b4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f598aa2..1363c22 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * (CQL3) Fix validation for IN queries for non-PK cols (CASSANDRA-4709)
  * fix re-created keyspace disappering after 1.1.5 upgrade (CASSANDRA-4698)
  * (CLI) display elapsed time in 2 fraction digits (CASSANDRA-3460)
+ * add authentication support to sstableloader (CASSANDRA-4712)
 Merged from 1.0:
  * Switch from NBHM to CHM in MessagingService's callback map, which
    prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d82b4/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java 
b/src/java/org/apache/cassandra/tools/BulkLoader.java
index ace37db..88e7443 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -37,7 +38,6 @@ import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 
 public class BulkLoader
 {
@@ -49,6 +49,8 @@ public class BulkLoader
     private static final String IGNORE_NODES_OPTION  = "ignore";
     private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
     private static final String RPC_PORT_OPTION = "port";
+    private static final String USER_OPTION = "username";
+    private static final String PASSWD_OPTION = "password";
     private static final String THROTTLE_MBITS = "throttle";
 
     public static void main(String args[]) throws IOException
@@ -57,7 +59,7 @@ public class BulkLoader
         try
         {
             OutputHandler handler = new 
OutputHandler.SystemOutput(options.verbose, options.debug);
-            SSTableLoader loader = new SSTableLoader(options.directory, new 
ExternalClient(handler, options.hosts, options.rpcPort), handler);
+            SSTableLoader loader = new SSTableLoader(options.directory, new 
ExternalClient(handler, options.hosts, options.rpcPort, options.user, 
options.passwd), handler);
             
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
             SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
 
@@ -179,13 +181,17 @@ public class BulkLoader
         private final OutputHandler outputHandler;
         private Set<InetAddress> hosts = new HashSet<InetAddress>();
         private int rpcPort;
+        private String user;
+        private String passwd;
 
-        public ExternalClient(OutputHandler outputHandler, Set<InetAddress> 
hosts, int port)
+        public ExternalClient(OutputHandler outputHandler, Set<InetAddress> 
hosts, int port, String user, String passwd)
         {
             super();
             this.outputHandler = outputHandler;
             this.hosts = hosts;
             this.rpcPort = port;
+            this.user = user;
+            this.passwd = passwd;
         }
 
         public void init(String keyspace)
@@ -198,7 +204,7 @@ public class BulkLoader
 
                     // Query endpoint to ranges map and schemas from thrift
                     InetAddress host = hostiter.next();
-                    Cassandra.Client client = 
createThriftClient(host.getHostAddress(), rpcPort);
+                    Cassandra.Client client = 
createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd);
                     List<TokenRange> tokenRanges = 
client.describe_ring(keyspace);
                     List<KsDef> ksDefs = client.describe_keyspaces();
 
@@ -237,13 +243,22 @@ public class BulkLoader
             return cfs != null && cfs.contains(cfName);
         }
 
-        private static Cassandra.Client createThriftClient(String host, int 
port) throws TTransportException
+        private static Cassandra.Client createThriftClient(String host, int 
port, String user, String passwd) throws Exception
         {
             TSocket socket = new TSocket(host, port);
             TTransport trans = new TFramedTransport(socket);
             trans.open();
             TProtocol protocol = new TBinaryProtocol(trans);
-            return new Cassandra.Client(protocol);
+            Cassandra.Client client = new Cassandra.Client(protocol);
+            if (user != null && passwd != null)
+            {
+                Map<String, String> credentials = new HashMap<String, 
String>();
+                credentials.put(IAuthenticator.USERNAME_KEY, user);
+                credentials.put(IAuthenticator.PASSWORD_KEY, passwd);
+                AuthenticationRequest authenticationRequest = new 
AuthenticationRequest(credentials);
+                client.login(authenticationRequest);
+            }
+            return client;
         }
     }
 
@@ -255,6 +270,8 @@ public class BulkLoader
         public boolean verbose;
         public boolean noProgress;
         public int rpcPort = 9160;
+        public String user;
+        public String passwd;
         public int throttle = 0;
 
         public Set<InetAddress> hosts = new HashSet<InetAddress>();
@@ -315,6 +332,12 @@ public class BulkLoader
                 if (cmd.hasOption(RPC_PORT_OPTION))
                     opts.rpcPort = 
Integer.valueOf(cmd.getOptionValue(RPC_PORT_OPTION));
 
+                if (cmd.hasOption(USER_OPTION))
+                    opts.user = cmd.getOptionValue(USER_OPTION);
+
+                if (cmd.hasOption(PASSWD_OPTION))
+                    opts.passwd = cmd.getOptionValue(PASSWD_OPTION);
+
                 if (cmd.hasOption(INITIAL_HOST_ADDRESS_OPTION))
                 {
                     String[] nodes = 
cmd.getOptionValue(INITIAL_HOST_ADDRESS_OPTION).split(",");
@@ -380,6 +403,8 @@ public class BulkLoader
             options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial 
hosts", "try to connect to these hosts (comma separated) initially for ring 
information");
             options.addOption("p",  RPC_PORT_OPTION, "rpc port", "port used 
for rpc (default 9160)");
             options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle 
speed in Mbits (default unlimited)");
+            options.addOption("u",  USER_OPTION, "username", "username for 
cassandra authentication");
+            options.addOption("pw", PASSWD_OPTION, "password", "password for 
cassandra authentication");
             return options;
         }
 

Reply via email to