Author: jbellis
Date: Wed Dec 8 00:55:29 2010
New Revision: 1043256
URL: http://svn.apache.org/viewvc?rev=1043256&view=rev
Log:
clean up and comment reducer code
patch by jbellis
Modified:
cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
Modified: cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java?rev=1043256&r1=1043255&r2=1043256&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java
(original)
+++ cassandra/branches/cassandra-0.7/contrib/word_count/src/WordCount.java Wed
Dec 8 00:55:29 2010
@@ -49,6 +49,9 @@ import org.apache.hadoop.util.ToolRunner
* "text" containing a sequence of words.
*
* For each word, we output the total number of occurrences across all texts.
+ *
+ * When outputting to Cassandra, we write the word counts as a {word, count}
column/value pair,
+ * with a row key equal to the name of the source column we read the words
from.
*/
public class WordCount extends Configured implements Tool
{
@@ -74,11 +77,17 @@ public class WordCount extends Configure
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
- private ByteBuffer columnName;
+ private ByteBuffer sourceColumn;
+
+ protected void setup(org.apache.hadoop.mapreduce.Mapper.Context
context)
+ throws IOException, InterruptedException
+ {
+ sourceColumn =
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
+ }
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn>
columns, Context context) throws IOException, InterruptedException
{
- IColumn column = columns.get(columnName);
+ IColumn column = columns.get(sourceColumn);
if (column == null)
return;
String value = ByteBufferUtil.string(column.value());
@@ -91,78 +100,48 @@ public class WordCount extends Configure
context.write(word, one);
}
}
-
- protected void setup(org.apache.hadoop.mapreduce.Mapper.Context
context)
- throws IOException, InterruptedException
- {
- this.columnName =
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
- }
-
}
public static class ReducerToFilesystem extends Reducer<Text, IntWritable,
Text, IntWritable>
{
- private IntWritable result = new IntWritable();
-
public void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
- {
sum += val.get();
- }
-
- result.set(sum);
- context.write(key, result);
+ context.write(key, new IntWritable(sum));
}
}
public static class ReducerToCassandra extends Reducer<Text, IntWritable,
ByteBuffer, List<Mutation>>
{
- private List<Mutation> results = new ArrayList<Mutation>();
- private String columnName;
-
- public void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException
- {
- int sum = 0;
- for (IntWritable val : values)
- {
- sum += val.get();
- }
-
- results.add(getMutation(key, sum));
- context.write(ByteBuffer.wrap(columnName.getBytes()), results);
- results.clear();
- }
+ private ByteBuffer outputKey;
protected void setup(org.apache.hadoop.mapreduce.Reducer.Context
context)
- throws IOException, InterruptedException
+ throws IOException, InterruptedException
{
- this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+ outputKey =
ByteBuffer.wrap(context.getConfiguration().get(CONF_COLUMN_NAME).getBytes());
}
- private static Mutation getMutation(Text key, int sum)
+ public void reduce(Text word, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException
{
- Mutation m = new Mutation();
- m.column_or_supercolumn = getCoSC(key, sum);
- return m;
+ int sum = 0;
+ for (IntWritable val : values)
+ sum += val.get();
+ context.write(outputKey,
Collections.singletonList(getMutation(word, sum)));
}
- private static ColumnOrSuperColumn getCoSC(Text key, int sum)
+ private static Mutation getMutation(Text word, int sum)
{
- // Have to convert both the key and the sum to ByteBuffers
- // for the generalized output format
- ByteBuffer name = ByteBuffer.wrap(key.getBytes());
- ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
-
Column c = new Column();
- c.name = name;
- c.value = value;
+ c.name = ByteBuffer.wrap(word.getBytes());
+ c.value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
c.timestamp = System.currentTimeMillis() * 1000;
- c.ttl = 0;
- ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
- cosc.column = c;
- return cosc;
+
+ Mutation m = new Mutation();
+ m.column_or_supercolumn = new ColumnOrSuperColumn();
+ m.column_or_supercolumn.column = c;
+ return m;
}
}
@@ -204,7 +183,7 @@ public class WordCount extends Configure
job.setOutputValueClass(List.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
-
+
ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
KEYSPACE, OUTPUT_COLUMN_FAMILY);
}