Author: jbellis
Date: Fri Feb 12 21:58:16 2010
New Revision: 909628
URL: http://svn.apache.org/viewvc?rev=909628&view=rev
Log:
make predicate configurable
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342
Modified:
incubator/cassandra/trunk/contrib/word_count/src/WordCount.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Modified: incubator/cassandra/trunk/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
--- incubator/cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ incubator/cassandra/trunk/contrib/word_count/src/WordCount.java Fri Feb 12
21:58:16 2010
@@ -1,4 +1,5 @@
import java.io.IOException;
+import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;
@@ -6,6 +7,7 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
@@ -48,9 +50,9 @@
public void map(String key, SortedMap<byte[], IColumn> columns,
Context context) throws IOException, InterruptedException
{
- if (columns == null)
- return;
IColumn column = columns.get(columnName.getBytes());
+ if (column == null)
+ return;
String value = new String(column.value());
logger.debug("read " + key + ":" + value + " from " +
context.getInputSplit());
@@ -99,6 +101,8 @@
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX +
i));
ColumnFamilyInputFormat.setColumnFamily(job, KEYSPACE,
COLUMN_FAMILY);
+ SlicePredicate predicate = new
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
+ ColumnFamilyInputFormat.setSlicePredicate(job, predicate);
job.waitForCompletion(true);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Feb 12 21:58:16 2010
@@ -3,20 +3,21 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.SortedMap;
import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -24,15 +25,24 @@
{
private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
private static final String COLUMNFAMILY_CONFIG =
"cassandra.input.columnfamily";
+ private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
private static final Logger logger =
Logger.getLogger(StorageService.class);
private String keyspace;
private String columnFamily;
+ private SlicePredicate predicate;
public static void setColumnFamily(Job job, String keyspace, String
columnFamily)
{
- validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+ if (keyspace == null)
+ {
+ throw new UnsupportedOperationException("keyspace may not be
null");
+ }
+ if (columnFamily == null)
+ {
+ throw new UnsupportedOperationException("columnfamily may not be
null");
+ }
try
{
ThriftValidation.validateColumnFamily(keyspace, columnFamily);
@@ -46,15 +56,52 @@
conf.set(COLUMNFAMILY_CONFIG, columnFamily);
}
- private static void validateNotNullKeyspaceAndColumnFamily(String
keyspace, String columnFamily)
+ public static void setSlicePredicate(Job job, SlicePredicate predicate)
{
- if (keyspace == null)
+ Configuration conf = job.getConfiguration();
+ conf.set(PREDICATE_CONFIG, predicateToString(predicate));
+ }
+
+ private static String predicateToString(SlicePredicate predicate)
+ {
+ assert predicate != null;
+ // this is so awful it's kind of cool!
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ try
{
- throw new RuntimeException("you forgot to set the keyspace with
setKeyspace()");
+ return serializer.toString(predicate, "UTF-8");
}
- if (columnFamily == null)
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private SlicePredicate predicateFromString(String st)
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new
TJSONProtocol.Factory());
+ SlicePredicate predicate = new SlicePredicate();
+ try
+ {
+ deserializer.deserialize(predicate, st, "UTF-8");
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return predicate;
+ }
+
+ private void validateConfiguration()
+ {
+ if (keyspace == null || columnFamily == null)
+ {
+ throw new UnsupportedOperationException("you must set the keyspace
and columnfamily with setColumnFamily()");
+ }
+ if (predicate == null)
{
- throw new RuntimeException("you forgot to set the column family
with setColumnFamily()");
+ throw new UnsupportedOperationException("you must set the
predicate with setPredicate");
}
}
@@ -63,7 +110,8 @@
Configuration conf = context.getConfiguration();
keyspace = conf.get(KEYSPACE_CONFIG);
columnFamily = conf.get(COLUMNFAMILY_CONFIG);
- validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+ predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
+ validateConfiguration();
List<TokenRange> map = getRangeMap();
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
@@ -72,7 +120,7 @@
if (logger.isDebugEnabled())
logger.debug("split range is [" + entry.start_token + ", " +
entry.end_token + "]");
String[] endpoints = entry.endpoints.toArray(new String[0]);
- splits.add(new ColumnFamilySplit(keyspace, columnFamily,
entry.start_token, entry.end_token, endpoints));
+ splits.add(new ColumnFamilySplit(keyspace, columnFamily,
predicate, entry.start_token, entry.end_token, endpoints));
}
return splits;
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Fri Feb 12 21:58:16 2010
@@ -86,19 +86,14 @@
{
throw new RuntimeException(e);
}
- SliceRange sliceRange = new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY,
- ArrayUtils.EMPTY_BYTE_ARRAY,
- false,
- Integer.MAX_VALUE);
KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
.setStart_token(split.getStartToken())
.setEnd_token(split.getEndToken());
- // TODO "paging" large rows would be good
try
{
rows = client.get_range_slices(split.getTable(),
new
ColumnParent(split.getColumnFamily()),
- new
SlicePredicate().setSlice_range(sliceRange),
+ split.getPredicate(),
keyRange,
ConsistencyLevel.ONE);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909628&r1=909627&r2=909628&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Feb 12 21:58:16 2010
@@ -4,8 +4,14 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
public class ColumnFamilySplit extends InputSplit implements Writable
{
@@ -14,14 +20,16 @@
private String table;
private String columnFamily;
private String[] dataNodes;
+ private SlicePredicate predicate;
- public ColumnFamilySplit(String table, String columnFamily, String
startToken, String endToken, String[] dataNodes)
+ public ColumnFamilySplit(String table, String columnFamily, SlicePredicate
predicate, String startToken, String endToken, String[] dataNodes)
{
assert startToken != null;
assert endToken != null;
this.startToken = startToken;
this.endToken = endToken;
this.columnFamily = columnFamily;
+ this.predicate = predicate;
this.table = table;
this.dataNodes = dataNodes;
}
@@ -46,6 +54,11 @@
return columnFamily;
}
+ public SlicePredicate getPredicate()
+ {
+ return predicate;
+ }
+
// getLength and getLocations satisfy the InputSplit abstraction
public long getLength()
@@ -62,6 +75,9 @@
// This should only be used by KeyspaceSplit.read();
protected ColumnFamilySplit() {}
+ private static final TSerializer tSerializer = new TSerializer(new
TBinaryProtocol.Factory());
+ private static final TDeserializer tDeserializer = new TDeserializer(new
TBinaryProtocol.Factory());
+
// These three methods are for serializing and deserializing
// KeyspaceSplits as needed by the Writable interface.
public void write(DataOutput out) throws IOException
@@ -70,6 +86,7 @@
out.writeUTF(columnFamily);
out.writeUTF(startToken);
out.writeUTF(endToken);
+ FBUtilities.serialize(tSerializer, predicate, out);
out.writeInt(dataNodes.length);
for (String endPoint : dataNodes)
@@ -77,13 +94,15 @@
out.writeUTF(endPoint);
}
}
-
+
public void readFields(DataInput in) throws IOException
{
table = in.readUTF();
columnFamily = in.readUTF();
startToken = in.readUTF();
endToken = in.readUTF();
+ predicate = new SlicePredicate();
+ FBUtilities.deserialize(tDeserializer, predicate, in);
int numOfEndPoints = in.readInt();
dataNodes = new String[numOfEndPoints];