add basic authentication support for Pig CassandraStorage; patch by Aleksey Yeschenko, reviewed by Brandon Williams for CASSANDRA-3042
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fab61e30 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fab61e30 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fab61e30 Branch: refs/heads/cassandra-1.2.0 Commit: fab61e309fd4ddf312f033f3458a9aa067279090 Parents: e30519f Author: Aleksey Yeschenko <alek...@apache.org> Authored: Mon Nov 19 21:37:09 2012 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Nov 19 21:37:57 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/hadoop/ConfigHelper.java | 22 ++++ .../cassandra/hadoop/pig/CassandraStorage.java | 86 ++++++++++----- 3 files changed, 83 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ef5e43b..a89184e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.1.7 + * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042) * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965) * reset getRangeSlice filter after finishing a row for get_paged_slice (CASSANDRA-4919) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index 1646635..4b49387 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -295,16 +295,38 @@ public class ConfigHelper return conf.get(OUTPUT_KEYSPACE_CONFIG); } + public static void setInputKeyspaceUserNameAndPassword(Configuration conf, String username, String password) + { + setInputKeyspaceUserName(conf, username); + setInputKeyspacePassword(conf, password); + } + + public static void setInputKeyspaceUserName(Configuration conf, String username) + { + conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, username); + } + public static String getInputKeyspaceUserName(Configuration conf) { return conf.get(INPUT_KEYSPACE_USERNAME_CONFIG); } + public static void setInputKeyspacePassword(Configuration conf, String password) + { + conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, password); + } + public static String getInputKeyspacePassword(Configuration conf) { return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG); } + public static void setOutputKeyspaceUserNameAndPassword(Configuration conf, String username, String password) + { + setOutputKeyspaceUserName(conf, username); + setOutputKeyspacePassword(conf, password); + } + public static void setOutputKeyspaceUserName(Configuration conf, String username) { conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index 8f539a9..c2f1c13 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -21,38 +21,31 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.IntegerType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Hex; -import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent; import org.apache.cassandra.hadoop.*; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.thrift.Deletion; -import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.UUIDGen; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.*; - import org.apache.pig.*; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -93,6 +86,8 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private ByteBuffer slice_end = BOUND; private boolean slice_reverse = false; private boolean allow_deletes = false; + private String username; + private String password; private String keyspace; private String column_family; private String loadSignature; @@ -467,8 +462,6 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private void setLocationFromUri(String location) throws IOException { - // parse uri into keyspace and columnfamily - String names[]; try { if (!location.startsWith("cassandra://")) @@ -496,12 +489,23 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary")); } String[] parts = urlParts[0].split("/+"); - keyspace = parts[1]; + String[] credentialsAndKeyspace = parts[1].split("@"); + if (credentialsAndKeyspace.length > 1) + { + String[] credentials = credentialsAndKeyspace[0].split(":"); + username = credentials[0]; + password = credentials[1]; + keyspace = credentialsAndKeyspace[1]; + } + else + { + keyspace = parts[1]; + } column_family = parts[2]; } catch (Exception e) { - throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage()); + throw new IOException("Expected 'cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]': " + e.getMessage()); } } @@ -559,6 +563,7 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { conf = job.getConfiguration(); setLocationFromUri(location); + if (ConfigHelper.getInputSlicePredicate(conf) == null) { SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit); @@ -573,6 +578,9 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo if (usePartitionFilter && getIndexExpressions() != null) ConfigHelper.setInputRange(conf, getIndexExpressions()); + if (username != null && password != null) + ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, password); + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, widerows); setConnectionInformation(); @@ -800,6 +808,10 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo { conf = job.getConfiguration(); setLocationFromUri(location); + + if (username != null && password != null) + ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, password); + ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); @@ -1032,18 +1044,38 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo private void initSchema(String signature) { - UDFContext context = UDFContext.getUDFContext(); - Properties property = context.getUDFProperties(CassandraStorage.class); + Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class); // Only get the schema if we haven't already gotten it - if (!property.containsKey(signature)) + if (!properties.containsKey(signature)) { - Cassandra.Client client = null; try { - client = ConfigHelper.getClientFromInputAddressList(conf); - CfDef cfDef = null; + Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); client.set_keyspace(keyspace); + + if (username != null && password != null) + { + Map<String, String> credentials = new HashMap<String, String>(2); + credentials.put(IAuthenticator.USERNAME_KEY, username); + credentials.put(IAuthenticator.PASSWORD_KEY, password); + + try + { + client.login(new AuthenticationRequest(credentials)); + } + catch (AuthenticationException e) + { + logger.error("Authentication exception: invalid username and/or password"); + throw new RuntimeException(e); + } + catch (AuthorizationException e) + { + throw new AssertionError(e); // never actually throws AuthorizationException. + } + } + + CfDef cfDef = null; KsDef ksDef = client.describe_keyspace(keyspace); List<CfDef> defs = ksDef.getCf_defs(); for (CfDef def : defs) @@ -1055,9 +1087,11 @@ public class CassandraStorage extends LoadFunc implements StoreFuncInterface, Lo } } if (cfDef != null) - property.setProperty(signature, cfdefToString(cfDef)); + properties.setProperty(signature, cfdefToString(cfDef)); else - throw new RuntimeException("Column family '" + column_family + "' not found in keyspace '" + keyspace + "'"); + throw new RuntimeException(String.format("Column family '%s' not found in keyspace '%s'", + column_family, + keyspace)); } catch (TException e) {