add basic authentication support for Pig CassandraStorage; patch by
Aleksey Yeschenko, reviewed by Brandon Williams for CASSANDRA-3042


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fab61e30
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fab61e30
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fab61e30

Branch: refs/heads/cassandra-1.2
Commit: fab61e309fd4ddf312f033f3458a9aa067279090
Parents: e30519f
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Mon Nov 19 21:37:09 2012 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Mon Nov 19 21:37:57 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   22 ++++
 .../cassandra/hadoop/pig/CassandraStorage.java     |   86 ++++++++++-----
 3 files changed, 83 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ef5e43b..a89184e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.1.7
+ * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042)
  * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965)
  * reset getRangeSlice filter after finishing a row for get_paged_slice
    (CASSANDRA-4919)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index 1646635..4b49387 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -295,16 +295,38 @@ public class ConfigHelper
         return conf.get(OUTPUT_KEYSPACE_CONFIG);
     }
 
+    public static void setInputKeyspaceUserNameAndPassword(Configuration conf, 
String username, String password)
+    {
+        setInputKeyspaceUserName(conf, username);
+        setInputKeyspacePassword(conf, password);
+    }
+
+    public static void setInputKeyspaceUserName(Configuration conf, String 
username)
+    {
+        conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, username);
+    }
+
     public static String getInputKeyspaceUserName(Configuration conf)
     {
        return conf.get(INPUT_KEYSPACE_USERNAME_CONFIG);
     }
 
+    public static void setInputKeyspacePassword(Configuration conf, String 
password)
+    {
+        conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, password);
+    }
+
     public static String getInputKeyspacePassword(Configuration conf)
     {
        return conf.get(INPUT_KEYSPACE_PASSWD_CONFIG);
     }
 
+    public static void setOutputKeyspaceUserNameAndPassword(Configuration 
conf, String username, String password)
+    {
+        setOutputKeyspaceUserName(conf, username);
+        setOutputKeyspacePassword(conf, password);
+    }
+
     public static void setOutputKeyspaceUserName(Configuration conf, String 
username)
     {
         conf.set(OUTPUT_KEYSPACE_USERNAME_CONFIG, username);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fab61e30/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 8f539a9..c2f1c13 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -21,38 +21,31 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Hex;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.marshal.*;
 import 
org.apache.cassandra.db.marshal.AbstractCompositeType.CompositeComponent;
 import org.apache.cassandra.hadoop.*;
-import org.apache.cassandra.thrift.Mutation;
-import org.apache.cassandra.thrift.Deletion;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
-
 import org.apache.pig.*;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -93,6 +86,8 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     private ByteBuffer slice_end = BOUND;
     private boolean slice_reverse = false;
     private boolean allow_deletes = false;
+    private String username;
+    private String password;
     private String keyspace;
     private String column_family;
     private String loadSignature;
@@ -467,8 +462,6 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
 
     private void setLocationFromUri(String location) throws IOException
     {
-        // parse uri into keyspace and columnfamily
-        String names[];
         try
         {
             if (!location.startsWith("cassandra://"))
@@ -496,12 +489,23 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
                     usePartitionFilter = 
Boolean.parseBoolean(urlQuery.get("use_secondary"));
             }
             String[] parts = urlParts[0].split("/+");
-            keyspace = parts[1];
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
             column_family = parts[2];
         }
         catch (Exception e)
         {
-            throw new IOException("Expected 
'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]':
 " + e.getMessage());
+            throw new IOException("Expected 
'cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1][&allow_deletes=true][widerows=true][use_secondary=true]]':
 " + e.getMessage());
         }
     }
 
@@ -559,6 +563,7 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     {
         conf = job.getConfiguration();
         setLocationFromUri(location);
+
         if (ConfigHelper.getInputSlicePredicate(conf) == null)
         {
             SliceRange range = new SliceRange(slice_start, slice_end, 
slice_reverse, limit);
@@ -573,6 +578,9 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
         if (usePartitionFilter && getIndexExpressions() != null)
             ConfigHelper.setInputRange(conf, getIndexExpressions());
 
+        if (username != null && password != null)
+            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, 
password);
+
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family, 
widerows);
         setConnectionInformation();
 
@@ -800,6 +808,10 @@ public class CassandraStorage extends LoadFunc implements 
StoreFuncInterface, Lo
     {
         conf = job.getConfiguration();
         setLocationFromUri(location);
+
+        if (username != null && password != null)
+            ConfigHelper.setOutputKeyspaceUserNameAndPassword(conf, username, 
password);
+
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
 
@@ -1032,18 +1044,38 @@ public class CassandraStorage extends LoadFunc 
implements StoreFuncInterface, Lo
 
     private void initSchema(String signature)
     {
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(CassandraStorage.class);
+        Properties properties = 
UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
 
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(signature))
+        if (!properties.containsKey(signature))
         {
-            Cassandra.Client client = null;
             try
             {
-                client = ConfigHelper.getClientFromInputAddressList(conf);
-                CfDef cfDef = null;
+                Cassandra.Client client = 
ConfigHelper.getClientFromInputAddressList(conf);
                 client.set_keyspace(keyspace);
+
+                if (username != null && password != null)
+                {
+                    Map<String, String> credentials = new HashMap<String, 
String>(2);
+                    credentials.put(IAuthenticator.USERNAME_KEY, username);
+                    credentials.put(IAuthenticator.PASSWORD_KEY, password);
+
+                    try
+                    {
+                        client.login(new AuthenticationRequest(credentials));
+                    }
+                    catch (AuthenticationException e)
+                    {
+                        logger.error("Authentication exception: invalid 
username and/or password");
+                        throw new RuntimeException(e);
+                    }
+                    catch (AuthorizationException e)
+                    {
+                        throw new AssertionError(e); // never actually throws 
AuthorizationException.
+                    }
+                }
+
+                CfDef cfDef = null;
                 KsDef ksDef = client.describe_keyspace(keyspace);
                 List<CfDef> defs = ksDef.getCf_defs();
                 for (CfDef def : defs)
@@ -1055,9 +1087,11 @@ public class CassandraStorage extends LoadFunc 
implements StoreFuncInterface, Lo
                     }
                 }
                 if (cfDef != null)
-                    property.setProperty(signature, cfdefToString(cfDef));
+                    properties.setProperty(signature, cfdefToString(cfDef));
                 else
-                    throw new RuntimeException("Column family '" + 
column_family + "' not found in keyspace '" + keyspace + "'");
+                    throw new RuntimeException(String.format("Column family 
'%s' not found in keyspace '%s'",
+                                                             column_family,
+                                                             keyspace));
             }
             catch (TException e)
             {

Reply via email to