Allow BRW to authenticate. Patch by Michal Michalski, reviewed by brandonwilliams for CASSANDRA-4155
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f2152dd4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f2152dd4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f2152dd4 Branch: refs/heads/trunk Commit: f2152dd4bb18be694c6e505fa99d5933373b1c11 Parents: bd7b209 Author: Brandon Williams <[email protected]> Authored: Wed Apr 18 11:25:11 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Wed Apr 18 11:32:00 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 32 ++++++++++++++- .../org/apache/cassandra/hadoop/ConfigHelper.java | 10 +++++ 2 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2152dd4/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index ec72038..42bb07e 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -27,6 +27,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; @@ -131,8 +132,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> if (writer == null) { AbstractType<?> subcomparator = null; + ExternalClient externalClient = null; + String username = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + if (cfType == CFType.SUPER) subcomparator = BytesType.instance; + this.writer = new SSTableSimpleUnsortedWriter( outputdir, ConfigHelper.getOutputPartitioner(conf), @@ -142,7 +148,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> subcomparator, Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")), ConfigHelper.getOutputCompressionParamaters(conf)); - this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler()); + + externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), + ConfigHelper.getOutputRpcPort(conf), + username, + password); + + this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler()); } } @@ -236,12 +248,16 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); private final String hostlist; private final int rpcPort; + private final String username; + private final String password; - public ExternalClient(String hostlist, int port) + public ExternalClient(String hostlist, int port, String username, String password) { super(); this.hostlist = hostlist; this.rpcPort = port; + this.username = username; + this.password = password; } public void init(String keyspace) @@ -266,6 +282,18 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { InetAddress host = hostiter.next(); Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); + + // log in + client.set_keyspace(keyspace); + if (username != null) + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(IAuthenticator.USERNAME_KEY, username); + creds.put(IAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + List<TokenRange> tokenRanges = client.describe_ring(keyspace); List<KsDef> ksDefs = client.describe_keyspaces(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f2152dd4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 8ec215e..ea0ba79 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -299,11 +299,21 @@ public class ConfigHelper return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG); } + public static void setOutputKeyspaceUserName(Configuration conf, String username) + { + conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username); + } + public static String getOutputKeyspaceUserName(Configuration conf) { return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG); } + public static void setOutputKeyspacePassword(Configuration conf, String password) + { + conf.set(OUTPUT_KEYSPACE_PASSWD_CONFIG, password); + } + public static String getOutputKeyspacePassword(Configuration conf) { return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);
