Author: jbellis
Date: Tue Sep 7 22:27:06 2010
New Revision: 993550
URL: http://svn.apache.org/viewvc?rev=993550&view=rev
Log:
add hadoop streaming files from CASSANDRA-1368
Added:
cassandra/trunk/contrib/hadoop_streaming_output/
cassandra/trunk/contrib/hadoop_streaming_output/README.txt
cassandra/trunk/contrib/hadoop_streaming_output/bin/
cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py
cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py
cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
Added: cassandra/trunk/contrib/hadoop_streaming_output/README.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/README.txt?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/README.txt (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/README.txt Tue Sep 7
22:27:06 2010
@@ -0,0 +1,24 @@
+Hadoop Streaming WordCount example: runs a simple Python wordcount mapper
script, and a slightly
+more complex Cassandra specific reducer that outputs the results of the
wordcount as Avro
+records to Cassandra.
+
+Dependencies for the example:
+* Hadoop 0.20.x installed, such that the 'bin/hadoop' launcher is available on
your path
+* Avro's Python library installed
+
+Unfortunately, due to the way Hadoop builds its CLASSPATH, it is also
necessary to
+upgrade a library that conflicts between Hadoop and Avro. Within your Hadoop
distribution,
+you'll need to ensure that the following jars are of a sufficiently high
version, or else
+you will see a classloader error at runtime:
+* jackson-core-asl >= 1.4.0
+* jackson-mapper-asl >= 1.4.0
+
+To run the example, edit bin/streaming to point to a valid Cassandra cluster
and
+then execute it over a text input, located on the local filesystem or in
Hadoop:
+$ bin/streaming -input <mytxtfile.txt>
+
+Hadoop streaming will execute a simple wordcount over your input files, and
write the generated
+counts to Keyspace1,Standard1. bin/reducer.py gives an example of how to
format the output of
+a script as Avro records which can be consumed by Cassandra's
AvroOutputReader, and bin/streaming
+shows the necessary incantations to execute a Hadoop Streaming job with
Cassandra as output.
+
Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py Tue Sep 7
22:27:06 2010
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Simple reducer for writing to Cassandra using Hadoop streaming, Python and
Avro,
+# based on
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+
+# Wordcount mapper
+# based on
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+import sys
+
+# input comes from STDIN (standard input)
+for line in sys.stdin:
+ # remove leading and trailing whitespace
+ line = line.strip()
+ # split the line into words
+ words = line.split()
+ # increase counters
+ for word in words:
+ # write the results to STDOUT (standard output);
+ # what we output here will be the input for the
+ # Reduce step, i.e. the input for reducer.py
+ #
+ # tab-delimited; the trivial word count is 1
+ print '%s\t%s' % (word, 1)
+
Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py Tue Sep 7
22:27:06 2010
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Wordcount reducer for writing to Cassandra using Hadoop streaming and Avro,
+# based on
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+from avro.io import BinaryEncoder, DatumWriter
+import avro.protocol
+import sys,time
+
+# input comes from STDIN (standard input)
+word2count = {}
+for line in sys.stdin:
+ # remove leading and trailing whitespace
+ line = line.strip()
+
+ # parse the input we got from mapper.py
+ word, count = line.split('\t', 1)
+ # convert count (currently a string) to int
+ try:
+ count = int(count)
+ word2count[word] = word2count.get(word, 0) + count
+ except ValueError:
+ # count was not a number, so silently
+ # ignore/discard this line
+ pass
+
+#
+# NB: the AvroOutputReader specific portion begins here
+#
+
+def new_column(name, value):
+ column = dict()
+ column['name'] = '%s' % name
+ column['value'] = '%s' % value
+ column['clock'] = {'timestamp': long(time.time() * 1e6)}
+ column['ttl'] = 0
+ return column
+
+# parse the current avro schema
+proto = avro.protocol.parse(open('cassandra.avpr').read())
+schema = proto.types_dict['StreamingMutation']
+# open an avro encoder and writer for stdout
+enc = BinaryEncoder(sys.stdout)
+writer = DatumWriter(schema)
+
+# output a series of objects matching 'StreamingMutation' in the Avro interface
+smutation = dict()
+try:
+ for word, count in word2count.iteritems():
+ smutation['key'] = word
+ smutation['mutation'] = {'column_or_supercolumn': {'column':
new_column('count', count)}}
+ writer.write(smutation, enc)
+finally:
+ sys.stdout.flush()
+
Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming Tue Sep 7
22:27:06 2010
@@ -0,0 +1,40 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+ARCHIVES=`ls -1 $cwd/../../../build/apache-cassandra*.jar`
+for jar in `ls -1 $cwd/../../../build/lib/jars/*.jar $cwd/../../../lib/*.jar`;
do
+ ARCHIVES=$ARCHIVES,$jar
+done
+
+hadoop jar $cwd/../../../build/lib/jars/hadoop-streaming*.jar \
+ -D stream.reduce.output=cassandra_avro_output \
+ -D
stream.io.identifier.resolver.class=org.apache.cassandra.hadoop.streaming.AvroResolver
\
+ -D cassandra.output.keyspace=Keyspace1 \
+ -D cassandra.output.columnfamily=Standard1 \
+ -D cassandra.partitioner.class=org.apache.cassandra.dht.RandomPartitioner \
+ -D cassandra.thrift.address=127.0.0.1 \
+ -D cassandra.thrift.port=9160 \
+ -libjars $ARCHIVES \
+ -file $cwd/../../../interface/avro/cassandra.avpr \
+ -outputformat org.apache.cassandra.hadoop.ColumnFamilyOutputFormat \
+ -output /tmp/ignored \
+ -mapper $cwd/mapper.py -reducer $cwd/reducer.py \
+ $*
+
Added:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java?rev=993550&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
Tue Sep 7 22:27:06 2010
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.hadoop.streaming;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.avro.StreamingMutation;
+import org.apache.cassandra.hadoop.ConfigHelper;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * An OutputReader that reads sequential StreamingMutations (from Cassandra's
Avro client API), and converts them to
+ * the objects used by CassandraOutputFormat. This allows Hadoop Streaming to
output efficiently to Cassandra via
+ * a familiar API.
+ *
+ * Avro requires the reader's and writer's schema: otherwise, it assumes they
are the same.
+ * If the canonical schema that the Cassandra side uses changes, and somebody
packaged the {{avpr}}
+ * up in their application somehow, or generated code, they'd see a runtime
failure.
+ * We could allow specifying an alternate Avro schema using a Configuration
property to work around this.
+ */
+public class AvroOutputReader extends OutputReader<ByteBuffer, List<Mutation>>
+{
+ private BinaryDecoder decoder;
+ private SpecificDatumReader<StreamingMutation> reader;
+
+ // reusable values
+ private final StreamingMutation entry = new StreamingMutation();
+ private final ArrayList<Mutation> mutations = new ArrayList<Mutation>(1);
+
+ @Override
+ public void initialize(PipeMapRed pmr) throws IOException
+ {
+ super.initialize(pmr);
+
+ // set up decoding around the DataInput (hmm) provided by streaming
+ InputStream in;
+ if (pmr.getClientInput() instanceof InputStream)
+ // let's hope this is the case
+ in = (InputStream)pmr.getClientInput();
+ else
+ // ...because this is relatively slow
+ in = new FromDataInputStream(pmr.getClientInput());
+ decoder = DecoderFactory.defaultFactory().createBinaryDecoder(in,
null);
+ reader = new
SpecificDatumReader<StreamingMutation>(StreamingMutation.SCHEMA$);
+ }
+
+ @Override
+ public boolean readKeyValue() throws IOException
+ {
+ try
+ {
+ reader.read(entry, decoder);
+ }
+ catch (EOFException e)
+ {
+ return false;
+ }
+ mutations.clear();
+ mutations.add(entry.mutation);
+ return true;
+ }
+
+ @Override
+ public ByteBuffer getCurrentKey() throws IOException
+ {
+ return entry.key;
+ }
+
+ @Override
+ public List<Mutation> getCurrentValue() throws IOException
+ {
+ return mutations;
+ }
+
+ @Override
+ public String getLastOutput()
+ {
+ return entry.toString();
+ }
+
+ /**
+ * Wraps a DataInput to extend InputStream. The exception handling in
read() is likely to be ridiculous slow.
+ */
+ private static final class FromDataInputStream extends InputStream
+ {
+ private final DataInput in;
+
+ public FromDataInputStream(DataInput in)
+ {
+ this.in = in;
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ return in.readUnsignedByte();
+ }
+ catch (EOFException e)
+ {
+ return -1;
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ long skipped = 0;
+ while (n > 0)
+ {
+ // skip in batches up to max_int in size
+ int skip = (int)Math.min(Integer.MAX_VALUE, n);
+ skipped += in.skipBytes(skip);
+ n -= skip;
+ }
+ return skipped;
+ }
+ }
+}
Added:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java?rev=993550&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
Tue Sep 7 22:27:06 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.hadoop.streaming;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.streaming.io.IdentifierResolver;
+
+/**
+ * Resolves AVRO_ID to the appropriate OutputReader and K/V classes for
Cassandra output.
+ *
+ * TODO: usage explanation
+ */
+public class AvroResolver extends IdentifierResolver
+{
+ public static final String AVRO_ID = "cassandra_avro_output";
+
+ @Override
+ public void resolve(String identifier)
+ {
+ if (!identifier.equalsIgnoreCase(AVRO_ID))
+ {
+ super.resolve(identifier);
+ return;
+ }
+
+ setInputWriterClass(null);
+ setOutputReaderClass(AvroOutputReader.class);
+ setOutputKeyClass(ByteBuffer.class);
+ setOutputValueClass(List.class);
+ }
+}