Author: brandonwilliams Date: Mon Mar 21 21:58:45 2011 New Revision: 1083982
URL: http://svn.apache.org/viewvc?rev=1083982&view=rev Log: Allow specifying a slice predicate for Pig queries Patch by brandonwilliams, reviewed by jbellis for CASSANDRA-1618 Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Modified: cassandra/branches/cassandra-0.7/contrib/pig/README.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/README.txt?rev=1083982&r1=1083981&r2=1083982&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/README.txt (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/README.txt Mon Mar 21 21:58:45 2011 @@ -47,6 +47,11 @@ grunt> orderednames = ORDER namecounts B grunt> topnames = LIMIT orderednames 50; grunt> dump topnames; +Slices on columns can also be specified: +grunt> rows = LOAD 'cassandra://Keyspace1/Standard1&slice_start=C2&slice_end=C4&i&limit=1&reversed=true' USING CassandraStorage(); + +Binary values for slice_start and slice_end can be escaped such as '\u0255' + Outputting to Cassandra requires the same format from input, so the simplest example is: grunt> rows = LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage(); Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1083982&r1=1083981&r2=1083982&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Mon Mar 21 21:58:45 2011 @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.utils.FBUtilities; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,10 +61,16 @@ public class CassandraStorage extends Lo private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); + private ByteBuffer slice_start = BOUND; + private ByteBuffer slice_end = BOUND; + private boolean slice_reverse = false; + private String keyspace; + private String column_family; + private Configuration conf; private RecordReader reader; private RecordWriter writer; - private final int limit; + private int limit; public CassandraStorage() { @@ -149,7 +156,7 @@ public class CassandraStorage extends Lo this.reader = reader; } - private String[] parseLocation(String location) throws IOException + private void setLocationFromUri(String location) throws IOException { // parse uri into keyspace and columnfamily String names[]; @@ -157,14 +164,30 @@ public class CassandraStorage extends Lo { if (!location.startsWith("cassandra://")) throw new Exception("Bad scheme."); - String[] parts = location.split("/+"); - names = new String[]{ parts[1], parts[2] }; + String[] urlParts = location.split("\\?"); + if (urlParts.length > 1) + { + for (String param : urlParts[1].split("&")) + { + String[] pair = param.split("="); + if (pair[0].equals("slice_start")) + slice_start = ByteBufferUtil.bytes(pair[1]); + else if (pair[0].equals("slice_end")) + slice_end = ByteBufferUtil.bytes(pair[1]); + else if (pair[0].equals("reversed")) + slice_reverse = Boolean.parseBoolean(pair[1]); + else if (pair[0].equals("limit")) + limit = Integer.parseInt(pair[1]); + } + } + String[] parts = urlParts[0].split("/+"); + keyspace = parts[1]; + column_family = parts[2]; } catch (Exception e) { - throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>': " + e.getMessage()); + throw new IOException("Expected 'cassandra://<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>[&reversed=true][&limit=1]]': " + e.getMessage()); } - return names; } private void setConnectionInformation() throws IOException @@ -186,12 +209,15 @@ public class CassandraStorage extends Lo @Override public void setLocation(String location, Job job) throws IOException { - SliceRange range = new SliceRange(BOUND, BOUND, false, limit); - SlicePredicate predicate = new SlicePredicate().setSlice_range(range); conf = job.getConfiguration(); - ConfigHelper.setInputSlicePredicate(conf, predicate); - String[] names = parseLocation(location); - ConfigHelper.setInputColumnFamily(conf, names[0], names[1]); + setLocationFromUri(location); + if (ConfigHelper.getRawInputSlicePredicate(conf) == null) + { + SliceRange range = new SliceRange(slice_start, slice_end, slice_reverse, limit); + SlicePredicate predicate = new SlicePredicate().setSlice_range(range); + ConfigHelper.setInputSlicePredicate(conf, predicate); + } + ConfigHelper.setInputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); } @@ -214,8 +240,8 @@ public class CassandraStorage extends Lo public void setStoreLocation(String location, Job job) throws IOException { conf = job.getConfiguration(); - String[] names = parseLocation(location); - ConfigHelper.setOutputColumnFamily(conf, names[0], names[1]); + setLocationFromUri(location); + ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family); setConnectionInformation(); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1083982&r1=1083981&r2=1083982&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Mon Mar 21 21:58:45 2011 @@ -159,6 +159,11 @@ public class ConfigHelper return predicateFromString(conf.get(INPUT_PREDICATE_CONFIG)); } + public static String getRawInputSlicePredicate(Configuration conf) + { + return conf.get(INPUT_PREDICATE_CONFIG); + } + private static String predicateToString(SlicePredicate predicate) { assert predicate != null;
