Updated Branches: refs/heads/cassandra-1.2 422d2236b -> a3dbc3ddc refs/heads/trunk 59c6e500d -> 2130b2e2a
Predicate pushdown support for CqlStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5790 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3dbc3dd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3dbc3dd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3dbc3dd Branch: refs/heads/cassandra-1.2 Commit: a3dbc3ddc720f9ca4a9075fa542b3a8af54187fc Parents: 422d223 Author: Brandon Williams <[email protected]> Authored: Thu Jul 25 10:52:26 2013 -0500 Committer: Brandon Williams <[email protected]> Committed: Thu Jul 25 10:52:26 2013 -0500 ---------------------------------------------------------------------- examples/pig/README.txt | 2 +- .../hadoop/pig/AbstractCassandraStorage.java | 48 +++++++++-- .../cassandra/hadoop/pig/CassandraStorage.java | 30 ------- .../apache/cassandra/hadoop/pig/CqlStorage.java | 84 +++++++++++++------- 4 files changed, 98 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/examples/pig/README.txt ---------------------------------------------------------------------- diff --git a/examples/pig/README.txt b/examples/pig/README.txt index 6dc0937..2ae9824 100644 --- a/examples/pig/README.txt +++ b/examples/pig/README.txt @@ -100,7 +100,7 @@ CqlStorage The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined ColumnFamilies. The main difference is in the URL format: -cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]] +cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]] Which in grunt, the simplest example would look like: http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index ff575b2..59d7817 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -18,7 +18,9 @@ package org.apache.cassandra.hadoop.pig; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.math.BigInteger; +import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; @@ -76,6 +78,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store protected String DEFAULT_INPUT_FORMAT; protected String DEFAULT_OUTPUT_FORMAT; + public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; + protected static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class); protected String username; @@ -90,6 +94,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store protected String outputFormatClass; protected int splitSize = 64 * 1024; protected String partitionerClass; + protected boolean usePartitionFilter = false; public AbstractCassandraStorage() { @@ -248,15 +253,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } } - /** decompose the query to store the parameters in a map*/ - public static Map<String, String> getQueryMap(String query) + /** decompose the query to store the parameters in a map */ + public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException { String[] params = query.split("&"); Map<String, String> map = new HashMap<String, String>(); for (String param : params) { String[] keyValue = param.split("="); - map.put(keyValue[0], keyValue[1]); + map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8")); } return map; } @@ -674,7 +679,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store logger.debug("Found ksDef name: {}", name); String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue())); - logger.debug("partition keys: " + keyString); + logger.debug("partition keys: {}", keyString); List<String> keyNames = FBUtilities.fromJsonList(keyString); Iterator<String> iterator = keyNames.iterator(); @@ -687,7 +692,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue())); - logger.debug("cluster keys: " + keyString); + logger.debug("cluster keys: {}", keyString); keyNames = FBUtilities.fromJsonList(keyString); iterator = keyNames.iterator(); @@ -699,7 +704,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store } String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue())); - logger.debug("row key validator: " + validator); + logger.debug("row key validator: {}", validator); AbstractType<?> keyValidator = parseType(validator); Iterator<ColumnDef> keyItera = keys.iterator(); @@ -713,7 +718,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store keyItera.next().validation_class = keyValidator.toString(); validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue())); - logger.debug("cluster key validator: " + validator); + logger.debug("cluster key validator: {}", validator); if (keyItera.hasNext() && validator != null && !validator.isEmpty()) { @@ -735,7 +740,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store try { String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue())); - logger.debug("default validator: " + compactValidator); + logger.debug("default validator: {}", compactValidator); AbstractType<?> defaultValidator = parseType(compactValidator); ColumnDef cDef = new ColumnDef(); @@ -766,5 +771,32 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store else return null; } + + /** return partition keys */ + public String[] getPartitionKeys(String location, Job job) + { + if (!usePartitionFilter) + return null; + List<ColumnDef> indexes = getIndexes(); + String[] partitionKeys = new String[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) + { + partitionKeys[i] = new String(indexes.get(i).getName()); + } + return partitionKeys; + } + + /** get a list of columns with defined index*/ + protected List<ColumnDef> getIndexes() + { + CfDef cfdef = getCfDef(loadSignature); + List<ColumnDef> indexes = new ArrayList<ColumnDef>(); + for (ColumnDef cdef : cfdef.column_metadata) + { + if (cdef.index_type != null) + indexes.add(cdef); + } + return indexes; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java index ed445a2..add4395 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java @@ -54,8 +54,6 @@ public class CassandraStorage extends AbstractCassandraStorage public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT"; public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY"; - private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; - private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class); @@ -68,7 +66,6 @@ public class CassandraStorage extends AbstractCassandraStorage private RecordWriter<ByteBuffer, List<Mutation>> writer; private boolean widerows = false; - private boolean usePartitionFilter = false; private int limit; // wide row hacks @@ -455,20 +452,6 @@ public class CassandraStorage extends AbstractCassandraStorage return schema; } - /** return partition keys */ - public String[] getPartitionKeys(String location, Job job) - { - if (!usePartitionFilter) - return null; - List<ColumnDef> indexes = getIndexes(); - String[] partitionKeys = new String[indexes.size()]; - for (int i = 0; i < indexes.size(); i++) - { - partitionKeys[i] = new String(indexes.get(i).getName()); - } - return partitionKeys; - } - /** set partition filter */ public void setPartitionFilter(Expression partitionFilter) { @@ -665,19 +648,6 @@ public class CassandraStorage extends AbstractCassandraStorage return indexExpressions; } - /** get a list of columns with defined index*/ - private List<ColumnDef> getIndexes() - { - CfDef cfdef = getCfDef(loadSignature); - List<ColumnDef> indexes = new ArrayList<ColumnDef>(); - for (ColumnDef cdef : cfdef.column_metadata) - { - if (cdef.index_type != null) - indexes.add(cdef); - } - return indexes; - } - /** convert a list of index expression to string */ private static String indexExpressionsToString(List<IndexExpression> indexExpressions) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 004b319..7e22823 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -32,10 +32,12 @@ import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.mapreduce.*; import org.apache.pig.Expression; +import org.apache.pig.Expression.OpType; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.*; +import org.apache.pig.impl.util.UDFContext; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,9 +139,18 @@ public class CqlStorage extends AbstractCassandraStorage CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize)); if (columns != null && !columns.trim().isEmpty()) - CqlConfigHelper.setInputColumns(conf, columns); - if (whereClause != null && !whereClause.trim().isEmpty()) - CqlConfigHelper.setInputWhereClauses(conf, whereClause); + CqlConfigHelper.setInputColumns(conf, columns); + + String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter(); + String wc = whereClause != null && !whereClause.trim().isEmpty() + ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter) + : whereClauseForPartitionFilter; + + if (wc != null) + { + logger.debug("where clause: {}", wc); + CqlConfigHelper.setInputWhereClauses(conf, wc); + } if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null) { @@ -193,7 +204,7 @@ public class CqlStorage extends AbstractCassandraStorage initSchema(storeSignature); } - /** schema: ((name, value), (name, value), (name, value)) where keys are in the front. */ + /** schema: (value, value, value) where keys are in the front. */ public ResourceSchema getSchema(String location, Job job) throws IOException { setLocation(location, job); @@ -209,28 +220,15 @@ public class CqlStorage extends AbstractCassandraStorage // will contain all fields for this schema List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>(); - // defined validators/indexes for (ColumnDef cdef : cfDef.column_metadata) { - // make a new tuple for each col/val pair - ResourceSchema innerTupleSchema = new ResourceSchema(); - ResourceFieldSchema innerTupleField = new ResourceFieldSchema(); - innerTupleField.setType(DataType.TUPLE); - innerTupleField.setSchema(innerTupleSchema); - innerTupleField.setName(new String(cdef.getName())); - ResourceFieldSchema idxColSchema = new ResourceFieldSchema(); - idxColSchema.setName("name"); - idxColSchema.setType(getPigType(UTF8Type.instance)); - ResourceFieldSchema valSchema = new ResourceFieldSchema(); AbstractType validator = validators.get(cdef.name); if (validator == null) validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR); - valSchema.setName("value"); + valSchema.setName(new String(cdef.getName())); valSchema.setType(getPigType(validator)); - - innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema }); - allSchemaFields.add(innerTupleField); + allSchemaFields.add(valSchema); } // top level schema contains everything @@ -238,16 +236,19 @@ public class CqlStorage extends AbstractCassandraStorage return schema; } - - /** We use CQL3 where clause to define the partition, so do nothing here*/ - public String[] getPartitionKeys(String location, Job job) + public void setPartitionFilter(Expression partitionFilter) { - return null; + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(AbstractCassandraStorage.class); + property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter)); } - /** We use CQL3 where clause to define the partition, so do nothing here*/ - public void setPartitionFilter(Expression partitionFilter) + /** retrieve where clause for partition filter */ + private String getWhereClauseForPartitionFilter() { + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(AbstractCassandraStorage.class); + return property.getProperty(PARTITION_FILTER_SIGNATURE); } public void prepareToWrite(RecordWriter writer) @@ -386,7 +387,7 @@ public class CqlStorage extends AbstractCassandraStorage /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>] * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>] - * [&split_size=<size>][&partitioner=<partitioner>]] */ + * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]] */ private void setLocationFromUri(String location) throws IOException { try @@ -420,6 +421,8 @@ public class CqlStorage extends AbstractCassandraStorage splitSize = Integer.parseInt(urlQuery.get("split_size")); if (urlQuery.containsKey("partitioner")) partitionerClass = urlQuery.get("partitioner"); + if (urlQuery.containsKey("use_secondary")) + usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary")); } String[] parts = urlParts[0].split("/+"); String[] credentialsAndKeyspace = parts[1].split("@"); @@ -440,7 +443,34 @@ public class CqlStorage extends AbstractCassandraStorage { throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>" + "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]" + - "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]': " + e.getMessage()); + "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]': " + e.getMessage()); + } + } + + /** + * Return cql where clauses for the corresponding partition filter. Make sure the data format matches + * Only support the following Pig data types: int, long, float, double, boolean and chararray + * */ + private String partitionFilterToWhereClauseString(Expression expression) + { + Expression.BinaryExpression be = (Expression.BinaryExpression) expression; + String name = be.getLhs().toString(); + String value = be.getRhs().toString(); + OpType op = expression.getOpType(); + String opString = op.name(); + switch (op) + { + case OP_EQ: + opString = " = "; + case OP_GE: + case OP_GT: + case OP_LE: + case OP_LT: + return String.format("%s %s %s", name, opString, value); + case OP_AND: + return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()), partitionFilterToWhereClauseString(be.getRhs())); + default: + throw new RuntimeException("Unsupported expression type: " + opString); } } }
