Author: jbellis
Date: Fri Feb 12 21:58:16 2010
New Revision: 909628

URL: http://svn.apache.org/viewvc?rev=909628&view=rev
Log:
make predicate configurable
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342

Modified:
    incubator/cassandra/trunk/contrib/word_count/src/WordCount.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java

Modified: incubator/cassandra/trunk/contrib/word_count/src/WordCount.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ incubator/cassandra/trunk/contrib/word_count/src/WordCount.java Fri Feb 12 
21:58:16 2010
@@ -1,4 +1,5 @@
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.SortedMap;
 import java.util.StringTokenizer;
 
@@ -6,6 +7,7 @@
 
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
@@ -48,9 +50,9 @@
 
         public void map(String key, SortedMap<byte[], IColumn> columns, 
Context context) throws IOException, InterruptedException
         {
-            if (columns == null)
-                return;
             IColumn column = columns.get(columnName.getBytes());
+            if (column == null)
+                return;
             String value = new String(column.value());
             logger.debug("read " + key + ":" + value + " from " + 
context.getInputSplit());
 
@@ -99,6 +101,8 @@
             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + 
i));
 
             ColumnFamilyInputFormat.setColumnFamily(job, KEYSPACE, 
COLUMN_FAMILY);
+            SlicePredicate predicate = new 
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
+            ColumnFamilyInputFormat.setSlicePredicate(job, predicate);
 
             job.waitForCompletion(true);
         }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
 Fri Feb 12 21:58:16 2010
@@ -3,20 +3,21 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.SortedMap;
 
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
@@ -24,15 +25,24 @@
 {
     private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
     private static final String COLUMNFAMILY_CONFIG = 
"cassandra.input.columnfamily";
+    private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
 
     private static final Logger logger = 
Logger.getLogger(StorageService.class);
 
     private String keyspace;
     private String columnFamily;
+    private SlicePredicate predicate;
 
     public static void setColumnFamily(Job job, String keyspace, String 
columnFamily)
     {
-        validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+        if (keyspace == null)
+        {
+            throw new UnsupportedOperationException("keyspace may not be 
null");
+        }
+        if (columnFamily == null)
+        {
+            throw new UnsupportedOperationException("columnfamily may not be 
null");
+        }
         try
         {
             ThriftValidation.validateColumnFamily(keyspace, columnFamily);
@@ -46,15 +56,52 @@
         conf.set(COLUMNFAMILY_CONFIG, columnFamily);
     }
 
-    private static void validateNotNullKeyspaceAndColumnFamily(String 
keyspace, String columnFamily)
+    public static void setSlicePredicate(Job job, SlicePredicate predicate)
     {
-        if (keyspace == null)
+        Configuration conf = job.getConfiguration();
+        conf.set(PREDICATE_CONFIG, predicateToString(predicate));
+    }
+
+    private static String predicateToString(SlicePredicate predicate)
+    {
+        assert predicate != null;
+        // this is so awful it's kind of cool!
+        TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+        try
         {
-            throw new RuntimeException("you forgot to set the keyspace with 
setKeyspace()");
+            return serializer.toString(predicate, "UTF-8");
         }
-        if (columnFamily == null)
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SlicePredicate predicateFromString(String st)
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new 
TJSONProtocol.Factory());
+        SlicePredicate predicate = new SlicePredicate();
+        try
+        {
+            deserializer.deserialize(predicate, st, "UTF-8");
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return predicate;
+    }
+
+    private void validateConfiguration()
+    {
+        if (keyspace == null || columnFamily == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace 
and columnfamily with setColumnFamily()");
+        }
+        if (predicate == null)
         {
-            throw new RuntimeException("you forgot to set the column family 
with setColumnFamily()");
+            throw new UnsupportedOperationException("you must set the 
predicate with setPredicate");
         }
     }
 
@@ -63,7 +110,8 @@
         Configuration conf = context.getConfiguration();
         keyspace = conf.get(KEYSPACE_CONFIG);
         columnFamily = conf.get(COLUMNFAMILY_CONFIG);
-        validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+        predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
+        validateConfiguration();
 
         List<TokenRange> map = getRangeMap();
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
@@ -72,7 +120,7 @@
             if (logger.isDebugEnabled())
                 logger.debug("split range is [" + entry.start_token + ", " + 
entry.end_token + "]");
             String[] endpoints = entry.endpoints.toArray(new String[0]);
-            splits.add(new ColumnFamilySplit(keyspace, columnFamily, 
entry.start_token, entry.end_token, endpoints));
+            splits.add(new ColumnFamilySplit(keyspace, columnFamily, 
predicate, entry.start_token, entry.end_token, endpoints));
         }
 
         return splits;

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Fri Feb 12 21:58:16 2010
@@ -86,19 +86,14 @@
             {
                 throw new RuntimeException(e);
             }
-            SliceRange sliceRange = new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                   ArrayUtils.EMPTY_BYTE_ARRAY,
-                                                   false,
-                                                   Integer.MAX_VALUE);
             KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
                                 .setStart_token(split.getStartToken())
                                 .setEnd_token(split.getEndToken());
-            // TODO "paging" large rows would be good
             try
             {
                 rows = client.get_range_slices(split.getTable(),
                                                new 
ColumnParent(split.getColumnFamily()),
-                                               new 
SlicePredicate().setSlice_range(sliceRange),
+                                               split.getPredicate(),
                                                keyRange,
                                                ConsistencyLevel.ONE);
             }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
 Fri Feb 12 21:58:16 2010
@@ -4,8 +4,14 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
 
 public class ColumnFamilySplit extends InputSplit implements Writable
 {
@@ -14,14 +20,16 @@
     private String table;
     private String columnFamily;
     private String[] dataNodes;
+    private SlicePredicate predicate;
 
-    public ColumnFamilySplit(String table, String columnFamily, String 
startToken, String endToken, String[] dataNodes)
+    public ColumnFamilySplit(String table, String columnFamily, SlicePredicate 
predicate, String startToken, String endToken, String[] dataNodes)
     {
         assert startToken != null;
         assert endToken != null;
         this.startToken = startToken;
         this.endToken = endToken;
         this.columnFamily = columnFamily;
+        this.predicate = predicate;
         this.table = table;
         this.dataNodes = dataNodes;
     }
@@ -46,6 +54,11 @@
         return columnFamily;
     }
 
+    public SlicePredicate getPredicate()
+    {
+        return predicate;
+    }
+
     // getLength and getLocations satisfy the InputSplit abstraction
     
     public long getLength()
@@ -62,6 +75,9 @@
     // This should only be used by KeyspaceSplit.read();
     protected ColumnFamilySplit() {}
 
+    private static final TSerializer tSerializer = new TSerializer(new 
TBinaryProtocol.Factory());
+    private static final TDeserializer tDeserializer = new TDeserializer(new 
TBinaryProtocol.Factory());
+
     // These three methods are for serializing and deserializing
     // KeyspaceSplits as needed by the Writable interface.
     public void write(DataOutput out) throws IOException
@@ -70,6 +86,7 @@
         out.writeUTF(columnFamily);
         out.writeUTF(startToken);
         out.writeUTF(endToken);
+        FBUtilities.serialize(tSerializer, predicate, out);
 
         out.writeInt(dataNodes.length);
         for (String endPoint : dataNodes)
@@ -77,13 +94,15 @@
             out.writeUTF(endPoint);
         }
     }
-    
+
     public void readFields(DataInput in) throws IOException
     {
         table = in.readUTF();
         columnFamily = in.readUTF();
         startToken = in.readUTF();
         endToken = in.readUTF();
+        predicate = new SlicePredicate();
+        FBUtilities.deserialize(tDeserializer, predicate, in);
 
         int numOfEndPoints = in.readInt();
         dataNodes = new String[numOfEndPoints];


Reply via email to