Author: jbellis
Date: Mon Sep 27 22:11:05 2010
New Revision: 1001930
URL: http://svn.apache.org/viewvc?rev=1001930&view=rev
Log:
Added option for filesystem/cassandra output.
patch by Jeremy Hanna; reviewed by Stu Hood for CASSANDRA-1342
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/word_count/README.txt
cassandra/trunk/contrib/word_count/bin/word_count (contents, props
changed)
cassandra/trunk/contrib/word_count/bin/word_count_setup (contents, props
changed)
cassandra/trunk/contrib/word_count/src/WordCount.java
cassandra/trunk/contrib/word_count/src/WordCountSetup.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Sep 27 22:11:05 2010
@@ -1,5 +1,7 @@
dev
* create EndpointSnitchInfo and MBean to expose rack and DC (CASSANDRA-1491)
+ * added option to contrib/word_count to output results back to Cassandra
+ (CASSANDRA-1342)
0.7-beta2
Modified: cassandra/trunk/contrib/word_count/README.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/README.txt?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/README.txt (original)
+++ cassandra/trunk/contrib/word_count/README.txt Mon Sep 27 22:11:05 2010
@@ -10,14 +10,20 @@ contrib/word_count$ ant
contrib/word_count$ bin/word_count_setup
contrib/word_count$ bin/word_count
-Output will be in /tmp/word_count*.
+The output of the word count can now be configured. In the bin/word_count
+file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
+and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
+directories. The cassandra option outputs to the 'Standard2' column family.
+
+In order to view the results in Cassandra, one can use python/pycassa and
+perform the following operations:
+$ python
+>>> import pycassa
+>>> con = pycassa.connect('Keyspace1')
+>>> cf = pycassa.ColumnFamily(con, 'Standard2')
+>>> list(cf.get_range())
Read the code in src/ for more details.
*If you want to point wordcount at a real cluster, modify the seed
-and listenaddress settings in storage-conf.xml accordingly.
-
-*For Mac users, the storage-conf.xml uses 127.0.0.2 for the
-word_count_setup. Mac OS X doesn't have that address available.
-To add it, run this before running bin/word_count_setup:
-sudo ifconfig lo0 alias 127.0.0.2 up
+and listenaddress settings accordingly.
Modified: cassandra/trunk/contrib/word_count/bin/word_count
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/bin/word_count (original)
+++ cassandra/trunk/contrib/word_count/bin/word_count Mon Sep 27 22:11:05 2010
@@ -53,5 +53,7 @@ if [ "x$JAVA" = "x" ]; then
exit 1
fi
+OUTPUT_REDUCER=filesystem
+
#echo $CLASSPATH
-$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER
Propchange: cassandra/trunk/contrib/word_count/bin/word_count
------------------------------------------------------------------------------
svn:executable = *
Modified: cassandra/trunk/contrib/word_count/bin/word_count_setup
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/bin/word_count_setup?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
(empty)
Propchange: cassandra/trunk/contrib/word_count/bin/word_count_setup
------------------------------------------------------------------------------
svn:executable = *
Modified: cassandra/trunk/contrib/word_count/src/WordCount.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCount.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCount.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCount.java Mon Sep 27 22:11:05
2010
@@ -17,10 +17,13 @@
*/
import java.io.IOException;
-import java.util.Arrays;
-import java.util.SortedMap;
-import java.util.StringTokenizer;
+import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.cassandra.avro.Column;
+import org.apache.cassandra.avro.ColumnOrSuperColumn;
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +55,13 @@ public class WordCount extends Configure
static final String KEYSPACE = "Keyspace1";
static final String COLUMN_FAMILY = "Standard1";
- private static final String CONF_COLUMN_NAME = "columnname";
+
+ static final String OUTPUT_REDUCER_VAR = "output_reducer";
+ static final String OUTPUT_COLUMN_FAMILY = "Standard2";
private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
+ private static final String CONF_COLUMN_NAME = "columnname";
+
public static void main(String[] args) throws Exception
{
// Let ToolRunner handle generic command-line options
@@ -92,7 +99,7 @@ public class WordCount extends Configure
}
- public static class IntSumReducer extends Reducer<Text, IntWritable, Text,
IntWritable>
+ public static class ReducerToFilesystem extends Reducer<Text, IntWritable,
Text, IntWritable>
{
private IntWritable result = new IntWritable();
@@ -109,30 +116,109 @@ public class WordCount extends Configure
}
}
+ public static class ReducerToCassandra extends Reducer<Text, IntWritable,
ByteBuffer, List<Mutation>>
+ {
+ private List<Mutation> results = new ArrayList<Mutation>();
+ private String columnName;
+
+ public void reduce(Text key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException
+ {
+ int sum = 0;
+ for (IntWritable val : values)
+ {
+ sum += val.get();
+ }
+
+ results.add(getMutation(key, sum));
+ context.write(ByteBuffer.wrap(columnName.getBytes()), results);
+ results.clear();
+ }
+
+ protected void setup(org.apache.hadoop.mapreduce.Reducer.Context
context)
+ throws IOException, InterruptedException
+ {
+ this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
+ }
+
+ private static Mutation getMutation(Text key, int sum)
+ {
+ Mutation m = new Mutation();
+ m.column_or_supercolumn = getCoSC(key, sum);
+ return m;
+ }
+
+ private static ColumnOrSuperColumn getCoSC(Text key, int sum)
+ {
+ // Have to convert both the key and the sum to ByteBuffers
+ // for the generalized output format
+ ByteBuffer name = ByteBuffer.wrap(key.getBytes());
+ ByteBuffer value = ByteBuffer.wrap(String.valueOf(sum).getBytes());
+
+ Column c = new Column();
+ c.name = name;
+ c.value = value;
+ c.timestamp = System.currentTimeMillis() * 1000;
+ c.ttl = 0;
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.column = c;
+ return cosc;
+ }
+ }
+
public int run(String[] args) throws Exception
{
+ String outputReducerType = "filesystem";
+ if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
+ {
+ String[] s = args[0].split("=");
+ if (s != null && s.length == 2)
+ outputReducerType = s[1];
+ }
+ logger.info("output reducer type: " + outputReducerType);
for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
{
String columnName = "text" + i;
getConf().set(CONF_COLUMN_NAME, columnName);
+
Job job = new Job(getConf(), "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
- job.setCombinerClass(IntSumReducer.class);
- job.setReducerClass(IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
+
+ if (outputReducerType.equalsIgnoreCase("filesystem"))
+ {
+ job.setCombinerClass(ReducerToFilesystem.class);
+ job.setReducerClass(ReducerToFilesystem.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+ FileOutputFormat.setOutputPath(job, new
Path(OUTPUT_PATH_PREFIX + i));
+ }
+ else
+ {
+ job.setReducerClass(ReducerToCassandra.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(ByteBuffer.class);
+ job.setOutputValueClass(List.class);
+
+ job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+
+ ConfigHelper.setOutputColumnFamily(job.getConfiguration(),
KEYSPACE, OUTPUT_COLUMN_FAMILY);
+ }
job.setInputFormatClass(ColumnFamilyInputFormat.class);
- FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX +
i));
+
ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInitialAddress(job.getConfiguration(),
"localhost");
+ ConfigHelper.setPartitioner(job.getConfiguration(),
"org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setInputColumnFamily(job.getConfiguration(),
KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new
SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);
+
+
job.waitForCompletion(true);
}
return 0;
Modified: cassandra/trunk/contrib/word_count/src/WordCountSetup.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/word_count/src/WordCountSetup.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
--- cassandra/trunk/contrib/word_count/src/WordCountSetup.java (original)
+++ cassandra/trunk/contrib/word_count/src/WordCountSetup.java Mon Sep 27
22:11:05 2010
@@ -97,8 +97,8 @@ public class WordCountSetup
private static void setupKeyspace(Cassandra.Iface client) throws
TException, InvalidRequestException
{
List<CfDef> cfDefList = new ArrayList<CfDef>();
- CfDef cfDef = new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY);
- cfDefList.add(cfDef);
+ cfDefList.add(new CfDef(WordCount.KEYSPACE, WordCount.COLUMN_FAMILY));
+ cfDefList.add(new CfDef(WordCount.KEYSPACE,
WordCount.OUTPUT_COLUMN_FAMILY));
client.system_add_keyspace(new KsDef(WordCount.KEYSPACE,
"org.apache.cassandra.locator.SimpleStrategy", 1, cfDefList));
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=1001930&r1=1001929&r2=1001930&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
Mon Sep 27 22:11:05 2010
@@ -51,7 +51,7 @@ import org.slf4j.LoggerFactory;
* The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
* OutputFormat that allows reduce tasks to store keys (and corresponding
* values) as Cassandra rows (and respective columns) in a given
- * {...@link ColumnFamily}.
+ * ColumnFamily.
*
* <p>
* As is the case with the {...@link ColumnFamilyInputFormat}, you need to set
the