Allow BRW to authenticate. Patch by Michal Michalski, reviewed by brandonwilliams for CASSANDRA-4155
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6fcd1f47 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6fcd1f47 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6fcd1f47 Branch: refs/heads/trunk Commit: 6fcd1f4704610f5073028faa5ccf027d81b0f470 Parents: ec242b7 Author: Brandon Williams <[email protected]> Authored: Wed Apr 18 11:25:11 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Wed Apr 18 11:30:07 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 32 ++++++++++++++- .../org/apache/cassandra/hadoop/ConfigHelper.java | 10 +++++ 2 files changed, 40 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 83646fb..cfd5fe4 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -30,6 +30,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; @@ -134,8 +135,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> if (writer == null) { AbstractType<?> subcomparator = null; + ExternalClient externalClient = null; + String username = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + if (cfType == CFType.SUPER) subcomparator = BytesType.instance; + this.writer = new SSTableSimpleUnsortedWriter( outputdir, ConfigHelper.getOutputPartitioner(conf), @@ -145,7 +151,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> subcomparator, Integer.valueOf(conf.get(BUFFER_SIZE_IN_MB, "64")), ConfigHelper.getOutputCompressionParamaters(conf)); - this.loader = new SSTableLoader(outputdir, new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), ConfigHelper.getOutputRpcPort(conf)), new NullOutputHandler()); + + externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), + ConfigHelper.getOutputRpcPort(conf), + username, + password); + + this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler()); } } @@ -239,12 +251,16 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); private String hostlist; private int rpcPort; + private final String username; + private final String password; - public ExternalClient(String hostlist, int port) + public ExternalClient(String hostlist, int port, String username, String password) { super(); this.hostlist = hostlist; this.rpcPort = port; + this.username = username; + this.password = password; } public void init(String keyspace) @@ -269,6 +285,18 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { InetAddress host = hostiter.next(); Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); + + // log in + client.set_keyspace(keyspace); + if (username != null) + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(IAuthenticator.USERNAME_KEY, username); + creds.put(IAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + List<TokenRange> tokenRanges = client.describe_ring(keyspace); List<KsDef> ksDefs = client.describe_keyspaces(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6fcd1f47/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index b2903a1..87dd5e0 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -302,11 +302,21 @@ public class ConfigHelper return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG); } + public static void setOutputKeyspaceUserName(Configuration conf, String username) + { + conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username); + } + public static String getOutputKeyspaceUserName(Configuration conf) { return conf.get(OUTPUT_KEYSPACE_USERNAME_CONFIG); } + public static void setOutputKeyspacePassword(Configuration conf, String password) + { + conf.set(OUTPUT_KEYSPACE_PASSWD_CONFIG, password); + } + public static String getOutputKeyspacePassword(Configuration conf) { return conf.get(OUTPUT_KEYSPACE_PASSWD_CONFIG);
