Author: johan
Date: Tue Feb 23 14:57:27 2010
New Revision: 915364
URL: http://svn.apache.org/viewvc?rev=915364&view=rev
Log:
Expose the Hadoop input split size to the user. Patch by johan, review by
jbellis. CASSANDRA-823
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Modified:
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=915364&r1=915363&r2=915364&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Tue Feb 23 14:57:27 2010
@@ -32,6 +32,7 @@
private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
private static final String COLUMNFAMILY_CONFIG =
"cassandra.input.columnfamily";
private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
+ private static final String INPUT_SPLIT_SIZE_CONFIG =
"cassandra.input.split.size";
private static final Logger logger =
Logger.getLogger(StorageService.class);
@@ -62,6 +63,19 @@
conf.set(COLUMNFAMILY_CONFIG, columnFamily);
}
+ /**
+ * Set the size of the input split.
+ * This affects the number of maps created, if the number is too small
+ * the overhead of each map will take up the bulk of the job time.
+ *
+ * @param job Job you are about to run.
+ * @param splitsize Size of the input split
+ */
+ public static void setInputSplitSize(Job job, int splitsize)
+ {
+ job.getConfiguration().setInt(INPUT_SPLIT_SIZE_CONFIG, splitsize);
+ }
+
public static void setSlicePredicate(Job job, SlicePredicate predicate)
{
Configuration conf = job.getConfiguration();
@@ -122,13 +136,15 @@
// cannonical ranges and nodes holding replicas
List<TokenRange> masterRangeNodes = getRangeMap();
+ int splitsize =
context.getConfiguration().getInt(INPUT_SPLIT_SIZE_CONFIG, 16384);
+
// cannonical ranges, split into pieces:
// for each range, pick a live owner and ask it to compute bite-sized
splits
// TODO parallelize this thread-per-range
Map<TokenRange, List<String>> splitRanges = new HashMap<TokenRange,
List<String>>();
for (TokenRange range : masterRangeNodes)
{
- splitRanges.put(range, getSubSplits(range));
+ splitRanges.put(range, getSubSplits(range, splitsize));
}
// turn the sub-ranges into InputSplits
@@ -143,7 +159,7 @@
for ( ; i < tokens.size(); i++)
{
ColumnFamilySplit split = new ColumnFamilySplit(keyspace,
columnFamily, predicate, tokens.get(i - 1), tokens.get(i), endpoints);
- logger.info("adding " + split);
+ logger.debug("adding " + split);
splits.add(split);
}
}
@@ -152,7 +168,7 @@
return splits;
}
- private List<String> getSubSplits(TokenRange range) throws IOException
+ private List<String> getSubSplits(TokenRange range, int splitsize) throws
IOException
{
// TODO handle failure of range replicas & retry
TSocket socket = new TSocket(range.endpoints.get(0),
@@ -170,7 +186,7 @@
List<String> splits;
try
{
- splits = client.describe_splits(range.start_token,
range.end_token, 128); // TODO make split size configurable
+ splits = client.describe_splits(range.start_token,
range.end_token, splitsize);
}
catch (TException e)
{