Updated Branches:
  refs/heads/cassandra-1.2 422d2236b -> a3dbc3ddc
  refs/heads/trunk 59c6e500d -> 2130b2e2a


Predicate pushdown support for CqlStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3dbc3dd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3dbc3dd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3dbc3dd

Branch: refs/heads/cassandra-1.2
Commit: a3dbc3ddc720f9ca4a9075fa542b3a8af54187fc
Parents: 422d223
Author: Brandon Williams <[email protected]>
Authored: Thu Jul 25 10:52:26 2013 -0500
Committer: Brandon Williams <[email protected]>
Committed: Thu Jul 25 10:52:26 2013 -0500

----------------------------------------------------------------------
 examples/pig/README.txt                         |  2 +-
 .../hadoop/pig/AbstractCassandraStorage.java    | 48 +++++++++--
 .../cassandra/hadoop/pig/CassandraStorage.java  | 30 -------
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 84 +++++++++++++-------
 4 files changed, 98 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 6dc0937..2ae9824 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -100,7 +100,7 @@ CqlStorage
 
 The CqlStorage class is somewhat similar to CassandraStorage, but it can work 
with CQL3-defined ColumnFamilies.  The main difference is in the URL format:
 
-cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]
+cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]]
 
 Which in grunt, the simplest example would look like:
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index ff575b2..59d7817 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -18,7 +18,9 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
@@ -76,6 +78,8 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
     protected String DEFAULT_INPUT_FORMAT;
     protected String DEFAULT_OUTPUT_FORMAT;
 
+    public final static String PARTITION_FILTER_SIGNATURE = 
"cassandra.partition.filter";
+
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractCassandraStorage.class);
 
     protected String username;
@@ -90,6 +94,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
     protected String outputFormatClass;
     protected int splitSize = 64 * 1024;
     protected String partitionerClass;
+    protected boolean usePartitionFilter = false; 
 
     public AbstractCassandraStorage()
     {
@@ -248,15 +253,15 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
         }
     }
 
-    /** decompose the query to store the parameters in a map*/
-    public static Map<String, String> getQueryMap(String query)
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws 
UnsupportedEncodingException 
     {
         String[] params = query.split("&");
         Map<String, String> map = new HashMap<String, String>();
         for (String param : params)
         {
             String[] keyValue = param.split("=");
-            map.put(keyValue[0], keyValue[1]);
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8"));
         }
         return map;
     }
@@ -674,7 +679,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
             logger.debug("Found ksDef name: {}", name);
             String keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
 
-            logger.debug("partition keys: " + keyString);
+            logger.debug("partition keys: {}", keyString);
             List<String> keyNames = FBUtilities.fromJsonList(keyString);
  
             Iterator<String> iterator = keyNames.iterator();
@@ -687,7 +692,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
 
             keyString = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
 
-            logger.debug("cluster keys: " + keyString);
+            logger.debug("cluster keys: {}", keyString);
             keyNames = FBUtilities.fromJsonList(keyString);
 
             iterator = keyNames.iterator();
@@ -699,7 +704,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
             }
 
             String validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: " + validator);
+            logger.debug("row key validator: {}", validator);
             AbstractType<?> keyValidator = parseType(validator);
 
             Iterator<ColumnDef> keyItera = keys.iterator();
@@ -713,7 +718,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
                 keyItera.next().validation_class = keyValidator.toString();
 
             validator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: " + validator);
+            logger.debug("cluster key validator: {}", validator);
 
             if (keyItera.hasNext() && validator != null && 
!validator.isEmpty())
             {
@@ -735,7 +740,7 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
                 try
                 {
                     String compactValidator = 
ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: " + compactValidator);
+                    logger.debug("default validator: {}", compactValidator);
                     AbstractType<?> defaultValidator = 
parseType(compactValidator);
 
                     ColumnDef cDef = new ColumnDef();
@@ -766,5 +771,32 @@ public abstract class AbstractCassandraStorage extends 
LoadFunc implements Store
         else
             return null;
     }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job)
+    {
+        if (!usePartitionFilter)
+            return null;
+        List<ColumnDef> indexes = getIndexes();
+        String[] partitionKeys = new String[indexes.size()];
+        for (int i = 0; i < indexes.size(); i++)
+        {
+            partitionKeys[i] = new String(indexes.get(i).getName());
+        }
+        return partitionKeys;
+    }
+
+    /** get a list of columns with defined index*/
+    protected List<ColumnDef> getIndexes()
+    {
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
+        {
+            if (cdef.index_type != null)
+                indexes.add(cdef);
+        }
+        return indexes;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index ed445a2..add4395 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -54,8 +54,6 @@ public class CassandraStorage extends AbstractCassandraStorage
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
     public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
 
-    private final static String PARTITION_FILTER_SIGNATURE = 
"cassandra.partition.filter";
-
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraStorage.class);
 
@@ -68,7 +66,6 @@ public class CassandraStorage extends AbstractCassandraStorage
     private RecordWriter<ByteBuffer, List<Mutation>> writer;
 
     private boolean widerows = false;
-    private boolean usePartitionFilter = false;
     private int limit;
     
     // wide row hacks
@@ -455,20 +452,6 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         return schema;
     }
 
-    /** return partition keys */
-    public String[] getPartitionKeys(String location, Job job)
-    {
-        if (!usePartitionFilter)
-            return null;
-        List<ColumnDef> indexes = getIndexes();
-        String[] partitionKeys = new String[indexes.size()];
-        for (int i = 0; i < indexes.size(); i++)
-        {
-            partitionKeys[i] = new String(indexes.get(i).getName());
-        }
-        return partitionKeys;
-    }
-
     /** set partition filter */
     public void setPartitionFilter(Expression partitionFilter)
     {
@@ -665,19 +648,6 @@ public class CassandraStorage extends 
AbstractCassandraStorage
         return indexExpressions;
     }
 
-    /** get a list of columns with defined index*/
-    private List<ColumnDef> getIndexes()
-    {
-        CfDef cfdef = getCfDef(loadSignature);
-        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
-        for (ColumnDef cdef : cfdef.column_metadata)
-        {
-            if (cdef.index_type != null)
-                indexes.add(cdef);
-        }
-        return indexes;
-    }
-
     /** convert a list of index expression to string */
     private static String indexExpressionsToString(List<IndexExpression> 
indexExpressions)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 004b319..7e22823 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -32,10 +32,12 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,9 +139,18 @@ public class CqlStorage extends AbstractCassandraStorage
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
         if (columns != null && !columns.trim().isEmpty())
-            CqlConfigHelper.setInputColumns(conf, columns);        
-        if (whereClause != null && !whereClause.trim().isEmpty())
-            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+            CqlConfigHelper.setInputColumns(conf, columns);
+
+        String whereClauseForPartitionFilter = 
getWhereClauseForPartitionFilter();
+        String wc = whereClause != null && !whereClause.trim().isEmpty() 
+                               ? whereClauseForPartitionFilter == null ? 
whereClause: String.format("%s AND %s", whereClause.trim(), 
whereClauseForPartitionFilter)
+                               : whereClauseForPartitionFilter;
+
+        if (wc != null)
+        {
+            logger.debug("where clause: {}", wc);
+            CqlConfigHelper.setInputWhereClauses(conf, wc);
+        } 
 
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
@@ -193,7 +204,7 @@ public class CqlStorage extends AbstractCassandraStorage
         initSchema(storeSignature);
     }
     
-    /** schema: ((name, value), (name, value), (name, value)) where keys are 
in the front. */
+    /** schema: (value, value, value) where keys are in the front. */
     public ResourceSchema getSchema(String location, Job job) throws 
IOException
     {
         setLocation(location, job);
@@ -209,28 +220,15 @@ public class CqlStorage extends AbstractCassandraStorage
         // will contain all fields for this schema
         List<ResourceFieldSchema> allSchemaFields = new 
ArrayList<ResourceFieldSchema>();
 
-        // defined validators/indexes
         for (ColumnDef cdef : cfDef.column_metadata)
         {
-            // make a new tuple for each col/val pair
-            ResourceSchema innerTupleSchema = new ResourceSchema();
-            ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
-            innerTupleField.setType(DataType.TUPLE);
-            innerTupleField.setSchema(innerTupleSchema);
-            innerTupleField.setName(new String(cdef.getName()));            
-            ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
-            idxColSchema.setName("name");
-            idxColSchema.setType(getPigType(UTF8Type.instance));
-
             ResourceFieldSchema valSchema = new ResourceFieldSchema();
             AbstractType validator = validators.get(cdef.name);
             if (validator == null)
                 validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-            valSchema.setName("value");
+            valSchema.setName(new String(cdef.getName()));
             valSchema.setType(getPigType(validator));
-
-            innerTupleSchema.setFields(new ResourceFieldSchema[] { 
idxColSchema, valSchema });
-            allSchemaFields.add(innerTupleField);
+            allSchemaFields.add(valSchema);
         }
 
         // top level schema contains everything
@@ -238,16 +236,19 @@ public class CqlStorage extends AbstractCassandraStorage
         return schema;
     }
 
-
-    /** We use CQL3 where clause to define the partition, so do nothing here*/
-    public String[] getPartitionKeys(String location, Job job)
+    public void setPartitionFilter(Expression partitionFilter)
     {
-        return null;
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
+        property.setProperty(PARTITION_FILTER_SIGNATURE, 
partitionFilterToWhereClauseString(partitionFilter));
     }
 
-    /** We use CQL3 where clause to define the partition, so do nothing here*/
-    public void setPartitionFilter(Expression partitionFilter)
+    /** retrieve where clause for partition filter */
+    private String getWhereClauseForPartitionFilter()
     {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = 
context.getUDFProperties(AbstractCassandraStorage.class);
+        return property.getProperty(PARTITION_FILTER_SIGNATURE);
     }
     
     public void prepareToWrite(RecordWriter writer)
@@ -386,7 +387,7 @@ public class CqlStorage extends AbstractCassandraStorage
     
     /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
      * 
[&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
-     * [&split_size=<size>][&partitioner=<partitioner>]] */
+     * 
[&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -420,6 +421,8 @@ public class CqlStorage extends AbstractCassandraStorage
                     splitSize = Integer.parseInt(urlQuery.get("split_size"));
                 if (urlQuery.containsKey("partitioner"))
                     partitionerClass = urlQuery.get("partitioner");
+                if (urlQuery.containsKey("use_secondary"))
+                    usePartitionFilter = 
Boolean.parseBoolean(urlQuery.get("use_secondary")); 
             }
             String[] parts = urlParts[0].split("/+");
             String[] credentialsAndKeyspace = parts[1].split("@");
@@ -440,7 +443,34 @@ public class CqlStorage extends AbstractCassandraStorage
         {
             throw new IOException("Expected 
'cql://[username:password@]<keyspace>/<columnfamily>" +
                                                 
"[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]"
 +
-                                                
"[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]': " 
+ e.getMessage());
+                                                
"[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]':
 " + e.getMessage());
+        }
+    }
+
+    /** 
+     * Return cql where clauses for the corresponding partition filter. Make 
sure the data format matches 
+     * Only support the following Pig data types: int, long, float, double, 
boolean and chararray
+     * */
+    private String partitionFilterToWhereClauseString(Expression expression)
+    {
+        Expression.BinaryExpression be = (Expression.BinaryExpression) 
expression;
+        String name = be.getLhs().toString();
+        String value = be.getRhs().toString();
+        OpType op = expression.getOpType();
+        String opString = op.name();
+        switch (op)
+        {
+            case OP_EQ:
+                opString = " = ";
+            case OP_GE:
+            case OP_GT:
+            case OP_LE:
+            case OP_LT:
+                return String.format("%s %s %s", name, opString, value);
+            case OP_AND:
+                return String.format("%s AND %s", 
partitionFilterToWhereClauseString(be.getLhs()), 
partitionFilterToWhereClauseString(be.getRhs()));
+            default:
+                throw new RuntimeException("Unsupported expression type: " + 
opString);
         }
     }
 }

Reply via email to