Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 429e5cf19 -> 4b8bb86e2 refs/heads/cassandra-2.1 acb4cf62d -> 60c7c73c4 refs/heads/trunk 1375e7fb4 -> 4698b8f1d
Support auth in CqlRecordWriter Patch by Alex Liu, reviewed by Ben Coverston for CASSANDRA-7340 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b8bb86e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b8bb86e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b8bb86e Branch: refs/heads/cassandra-2.0 Commit: 4b8bb86e272d8ddab62f671d5109e02c32c07728 Parents: 429e5cf Author: Brandon Williams <[email protected]> Authored: Mon Jun 9 13:42:13 2014 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Jun 9 13:42:13 2014 -0500 ---------------------------------------------------------------------- .../AbstractColumnFamilyOutputFormat.java | 22 +++++++++++++------- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 7 ++++++- 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b8bb86e/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java index 96ca65d..f4963fb 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java @@ -124,18 +124,24 @@ public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputForma TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); Cassandra.Client client = new Cassandra.Client(binaryProtocol); client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); - if ((ConfigHelper.getOutputKeyspaceUserName(conf) != null) && (ConfigHelper.getOutputKeyspacePassword(conf) != null)) - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf)); - creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf)); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } + String user = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + if ((user != null) && (password != null)) + login(user, password, client); + logger.debug("Authenticated client for CF output format created successfully"); return client; } + public static void login(String user, String password, Cassandra.Client client) throws Exception + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(IAuthenticator.USERNAME_KEY, user); + creds.put(IAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + /** * An {@link OutputCommitter} that does nothing. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b8bb86e/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 9c2e156..ee7aabf 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -27,7 +27,6 @@ import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; @@ -36,6 +35,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat; import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.thrift.*; @@ -103,6 +103,11 @@ final class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, try { Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf); + client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); + String user = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + if ((user != null) && (password != null)) + AbstractColumnFamilyOutputFormat.login(user, password, client); retrievePartitionKeyValidator(client); String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); if (cqlQuery.toLowerCase().startsWith("insert"))
