Allow BRW to authenticate.
Patch by Michal Michalski, reviewed by brandonwilliams for
CASSANDRA-4155


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2152dd4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2152dd4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2152dd4

Branch: refs/heads/trunk
Commit: f2152dd4bb18be694c6e505fa99d5933373b1c11
Parents: bd7b209
Author: Brandon Williams <[email protected]>
Authored: Wed Apr 18 11:25:11 2012 -0500
Committer: Brandon Williams <[email protected]>
Committed: Wed Apr 18 11:32:00 2012 -0500

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |   32 ++++++++++++++-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   10 +++++
 2 files changed, 40 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2152dd4/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index ec72038..42bb07e 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -131,8 +132,13 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         if (writer == null)
         {
             AbstractType<?> subcomparator = null;
+            ExternalClient externalClient = null;
+            String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+            String password = ConfigHelper.getOutputKeyspacePassword(conf);
+
             if (cfType == CFType.SUPER)
                 subcomparator = BytesType.instance;
+            
             this.writer = new SSTableSimpleUnsortedWriter(
                     outputdir,
                     ConfigHelper.getOutputPartitioner(conf),
@@ -142,7 +148,13 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     subcomparator,
                     Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")),
                     ConfigHelper.getOutputCompressionParamaters(conf));
-            this.loader = new SSTableLoader(outputdir, new 
ExternalClient(ConfigHelper.getOutputInitialAddress(conf), 
ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler());
+
+            externalClient = new 
ExternalClient(ConfigHelper.getOutputInitialAddress(conf), 
+                                                
ConfigHelper.getOutputRpcPort(conf),
+                                                username,
+                                                password);
+
+            this.loader = new SSTableLoader(outputdir, externalClient, new 
NullOutputHandler());
         }
     }
 
@@ -236,12 +248,16 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
         private final Map<String, Set<String>> knownCfs = new HashMap<String, 
Set<String>>();
         private final String hostlist;
         private final int rpcPort;
+        private final String username;
+        private final String password;
 
-        public ExternalClient(String hostlist, int port)
+        public ExternalClient(String hostlist, int port, String username, 
String password)
         {
             super();
             this.hostlist = hostlist;
             this.rpcPort = port;
+            this.username = username;
+            this.password = password;
         }
 
         public void init(String keyspace)
@@ -266,6 +282,18 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 {
                     InetAddress host = hostiter.next();
                     Cassandra.Client client = 
createThriftClient(host.getHostAddress(), rpcPort);
+
+                    // log in
+                    client.set_keyspace(keyspace);
+                    if (username != null)
+                    {
+                        Map<String, String> creds = new HashMap<String, 
String>();
+                        creds.put(IAuthenticator.USERNAME_KEY, username);
+                        creds.put(IAuthenticator.PASSWORD_KEY, password);
+                        AuthenticationRequest authRequest = new 
AuthenticationRequest(creds);
+                        client.login(authRequest);
+                    }
+
                     List<TokenRange> tokenRanges = 
client.describe_ring(keyspace);
                     List<KsDef> ksDefs = client.describe_keyspaces();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2152dd4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 8ec215e..ea0ba79 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -299,11 +299,21 @@ public class ConfigHelper
         return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
     }
 
+    public static void setOutputKeyspaceUserName(Configuration conf, String 
username)
+    {
+        conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username);
+    }
+
     public static String getOutputKeyspaceUserName(Configuration conf)
     {
         return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG);
     }
 
+    public static void setOutputKeyspacePassword(Configuration conf, String 
password)
+    {
+        conf.set(OUTPUT_KEYSPACE_PASSWD_CONFIG, password);
+    }
+
     public static String getOutputKeyspacePassword(Configuration conf)
     {
         return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);

Reply via email to