Author: jbellis
Date: Tue Jul 12 19:26:11 2011
New Revision: 1145731
URL: http://svn.apache.org/viewvc?rev=1145731&view=rev
Log:
add KeyRangeoption to Hadoop inputformat
patch by Mck SembWever; reviewed by jbellis for CASSANDRA-1125
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Tue Jul 12 19:26:11 2011
@@ -24,6 +24,7 @@
* add option to specify limit for get_slice in the CLI (CASSANDRA-2646)
* decrease HH page size (CASSANDRA-2832)
* reset cli keyspace after dropping the current one (CASSANDRA-2763)
+ * add KeyRange option to Hadoop inputformat (CASSANDRA-1125)
0.8.1
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Tue Jul 12 19:26:11 2011
@@ -35,8 +35,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.hadoop.conf.Configuration;
@@ -102,10 +105,44 @@ public class ColumnFamilyInputFormat ext
try
{
List<Future<List<InputSplit>>> splitfutures = new
ArrayList<Future<List<InputSplit>>>();
+ KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+ IPartitioner partitioner = null;
+ Range jobRange = null;
+ if (jobKeyRange != null)
+ {
+ partitioner =
ConfigHelper.getPartitioner(context.getConfiguration());
+ assert partitioner.preservesOrder() :
"ConfigHelper.setInputKeyRange(..) can only be used with a order preserving
paritioner";
+ assert jobKeyRange.start_key == null : "only start_token
supported";
+ assert jobKeyRange.end_key == null : "only end_token
supported";
+ jobRange = new
Range(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+
partitioner.getTokenFactory().fromString(jobKeyRange.end_token),
+ partitioner);
+ }
+
for (TokenRange range : masterRangeNodes)
{
+ if (jobRange == null)
+ {
// for each range, pick a live owner and ask it to compute
bite-sized splits
splitfutures.add(executor.submit(new SplitCallable(range,
conf)));
+ }
+ else
+ {
+ Range dhtRange = new
Range(partitioner.getTokenFactory().fromString(range.start_token),
+
partitioner.getTokenFactory().fromString(range.end_token),
+ partitioner);
+
+ if (dhtRange.intersects(jobRange))
+ {
+ Set<Range> intersections =
dhtRange.intersectionWith(jobRange);
+ assert intersections.size() == 1 : "wrapping ranges
not yet supported";
+ Range intersection = intersections.iterator().next();
+ range.start_token =
partitioner.getTokenFactory().toString(intersection.left);
+ range.end_token =
partitioner.getTokenFactory().toString(intersection.right);
+ // for each range, pick a live owner and ask it to
compute bite-sized splits
+ splitfutures.add(executor.submit(new
SplitCallable(range, conf)));
+ }
+ }
}
// wait until we have all the results back
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1145731&r1=1145730&r2=1145731&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Tue Jul 12 19:26:11 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.hadoop;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.utils.FBUtilities;
@@ -42,6 +43,7 @@ public class ConfigHelper
private static final String INPUT_COLUMNFAMILY_CONFIG =
"cassandra.input.columnfamily";
private static final String OUTPUT_COLUMNFAMILY_CONFIG =
"cassandra.output.columnfamily";
private static final String INPUT_PREDICATE_CONFIG =
"cassandra.input.predicate";
+ private static final String INPUT_KEYRANGE_CONFIG =
"cassandra.input.keyRange";
private static final String OUTPUT_PREDICATE_CONFIG =
"cassandra.output.predicate";
private static final String INPUT_SPLIT_SIZE_CONFIG =
"cassandra.input.split.size";
private static final int DEFAULT_SPLIT_SIZE = 64 * 1024;
@@ -195,6 +197,53 @@ public class ConfigHelper
return predicate;
}
+ /**
+ * Set the KeyRange to limit the rows.
+ * @param conf Job configuration you are about to run
+ */
+ public static void setInputRange(Configuration conf, String startToken,
String endToken)
+ {
+ KeyRange range = new
KeyRange().setStart_token(startToken).setEnd_token(endToken);
+ conf.set(INPUT_KEYRANGE_CONFIG, keyRangeToString(range));
+ }
+
+ /** may be null if unset */
+ public static KeyRange getInputKeyRange(Configuration conf)
+ {
+ String str = conf.get(INPUT_KEYRANGE_CONFIG);
+ return null != str ? keyRangeFromString(str) : null;
+ }
+
+ private static String keyRangeToString(KeyRange keyRange)
+ {
+ assert keyRange != null;
+ TSerializer serializer = new TSerializer(new
TBinaryProtocol.Factory());
+ try
+ {
+ return FBUtilities.bytesToHex(serializer.serialize(keyRange));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static KeyRange keyRangeFromString(String st)
+ {
+ assert st != null;
+ TDeserializer deserializer = new TDeserializer(new
TBinaryProtocol.Factory());
+ KeyRange keyRange = new KeyRange();
+ try
+ {
+ deserializer.deserialize(keyRange, FBUtilities.hexToBytes(st));
+ }
+ catch (TException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return keyRange;
+ }
+
public static String getInputKeyspace(Configuration conf)
{
return conf.get(INPUT_KEYSPACE_CONFIG);