Author: jbellis
Date: Tue Sep  7 22:27:06 2010
New Revision: 993550

URL: http://svn.apache.org/viewvc?rev=993550&view=rev
Log:
add hadoop streaming files from CASSANDRA-1368

Added:
    cassandra/trunk/contrib/hadoop_streaming_output/
    cassandra/trunk/contrib/hadoop_streaming_output/README.txt
    cassandra/trunk/contrib/hadoop_streaming_output/bin/
    cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py
    cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py
    cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java

Added: cassandra/trunk/contrib/hadoop_streaming_output/README.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/README.txt?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/README.txt (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/README.txt Tue Sep  7 
22:27:06 2010
@@ -0,0 +1,24 @@
+Hadoop Streaming WordCount example: runs a simple Python wordcount mapper 
script, and a slightly
+more complex Cassandra specific reducer that outputs the results of the 
wordcount as Avro
+records to Cassandra.
+
+Dependencies for the example:
+* Hadoop 0.20.x installed, such that the 'bin/hadoop' launcher is available on 
your path
+* Avro's Python library installed
+
+Unfortunately, due to the way Hadoop builds its CLASSPATH, it is also 
necessary to
+upgrade a library that conflicts between Hadoop and Avro. Within your Hadoop 
distribution,
+you'll need to ensure that the following jars are of a sufficiently high 
version, or else
+you will see a classloader error at runtime:
+* jackson-core-asl >= 1.4.0
+* jackson-mapper-asl >= 1.4.0
+
+To run the example, edit bin/streaming to point to a valid Cassandra cluster 
and
+then execute it over a text input, located on the local filesystem or in 
Hadoop:
+$ bin/streaming -input <mytxtfile.txt>
+
+Hadoop streaming will execute a simple wordcount over your input files, and 
write the generated
+counts to Keyspace1,Standard1. bin/reducer.py gives an example of how to 
format the output of
+a script as Avro records which can be consumed by Cassandra's 
AvroOutputReader, and bin/streaming
+shows the necessary incantations to execute a Hadoop Streaming job with 
Cassandra as output.
+

Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/mapper.py Tue Sep  7 
22:27:06 2010
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Simple reducer for writing to Cassandra using Hadoop streaming, Python and 
Avro,
+# based on 
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+
+# Wordcount mapper
+# based on 
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+import sys
+ 
+# input comes from STDIN (standard input)
+for line in sys.stdin:
+    # remove leading and trailing whitespace
+    line = line.strip()
+    # split the line into words
+    words = line.split()
+    # increase counters
+    for word in words:
+        # write the results to STDOUT (standard output);
+        # what we output here will be the input for the
+        # Reduce step, i.e. the input for reducer.py
+        #
+        # tab-delimited; the trivial word count is 1
+        print '%s\t%s' % (word, 1)
+

Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/reducer.py Tue Sep  7 
22:27:06 2010
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Wordcount reducer for writing to Cassandra using Hadoop streaming and Avro,
+# based on 
http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python
+
+from avro.io import BinaryEncoder, DatumWriter
+import avro.protocol
+import sys,time
+
+# input comes from STDIN (standard input)
+word2count = {}
+for line in sys.stdin:
+    # remove leading and trailing whitespace
+    line = line.strip()
+
+    # parse the input we got from mapper.py
+    word, count = line.split('\t', 1)
+    # convert count (currently a string) to int
+    try:
+        count = int(count)
+        word2count[word] = word2count.get(word, 0) + count
+    except ValueError:
+        # count was not a number, so silently
+        # ignore/discard this line
+        pass
+
+#
+# NB: the AvroOutputReader specific portion begins here
+#
+
+def new_column(name, value):
+    column = dict()
+    column['name'] = '%s' % name
+    column['value'] = '%s' % value
+    column['clock'] = {'timestamp': long(time.time() * 1e6)}
+    column['ttl'] = 0
+    return column
+
+# parse the current avro schema
+proto = avro.protocol.parse(open('cassandra.avpr').read())
+schema = proto.types_dict['StreamingMutation']
+# open an avro encoder and writer for stdout
+enc = BinaryEncoder(sys.stdout)
+writer = DatumWriter(schema)
+
+# output a series of objects matching 'StreamingMutation' in the Avro interface
+smutation = dict()
+try:
+    for word, count in word2count.iteritems():
+        smutation['key'] = word
+        smutation['mutation'] = {'column_or_supercolumn': {'column': 
new_column('count', count)}}
+        writer.write(smutation, enc)
+finally:
+    sys.stdout.flush()
+

Added: cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming?rev=993550&view=auto
==============================================================================
--- cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming (added)
+++ cassandra/trunk/contrib/hadoop_streaming_output/bin/streaming Tue Sep  7 
22:27:06 2010
@@ -0,0 +1,40 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+ARCHIVES=`ls -1 $cwd/../../../build/apache-cassandra*.jar`
+for jar in `ls -1 $cwd/../../../build/lib/jars/*.jar $cwd/../../../lib/*.jar`; 
do
+    ARCHIVES=$ARCHIVES,$jar
+done
+
+hadoop jar $cwd/../../../build/lib/jars/hadoop-streaming*.jar \
+    -D stream.reduce.output=cassandra_avro_output \
+    -D 
stream.io.identifier.resolver.class=org.apache.cassandra.hadoop.streaming.AvroResolver
 \
+    -D cassandra.output.keyspace=Keyspace1 \
+    -D cassandra.output.columnfamily=Standard1 \
+    -D cassandra.partitioner.class=org.apache.cassandra.dht.RandomPartitioner \
+    -D cassandra.thrift.address=127.0.0.1 \
+    -D cassandra.thrift.port=9160 \
+    -libjars $ARCHIVES \
+    -file $cwd/../../../interface/avro/cassandra.avpr \
+    -outputformat org.apache.cassandra.hadoop.ColumnFamilyOutputFormat \
+    -output /tmp/ignored \
+    -mapper $cwd/mapper.py -reducer $cwd/reducer.py \
+    $*
+

Added: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java?rev=993550&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroOutputReader.java
 Tue Sep  7 22:27:06 2010
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.    See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.    The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.    You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.    See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.hadoop.streaming;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.avro.Mutation;
+import org.apache.cassandra.avro.StreamingMutation;
+import org.apache.cassandra.hadoop.ConfigHelper;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.streaming.io.OutputReader;
+import org.apache.hadoop.streaming.PipeMapRed;
+
+/**
+ * An OutputReader that reads sequential StreamingMutations (from Cassandra's 
Avro client API), and converts them to
+ * the objects used by CassandraOutputFormat. This allows Hadoop Streaming to 
output efficiently to Cassandra via
+ * a familiar API.
+ *
+ * Avro requires the reader's and writer's schema: otherwise, it assumes they 
are the same.
+ * If the canonical schema that the Cassandra side uses changes, and somebody 
packaged the {{avpr}}
+ * up in their application somehow, or generated code, they'd see a runtime 
failure.
+ * We could allow specifying an alternate Avro schema using a Configuration 
property to work around this.
+ */
+public class AvroOutputReader extends OutputReader<ByteBuffer, List<Mutation>>
+{
+    private BinaryDecoder decoder;
+    private SpecificDatumReader<StreamingMutation> reader;
+
+    // reusable values
+    private final StreamingMutation entry = new StreamingMutation();
+    private final ArrayList<Mutation> mutations = new ArrayList<Mutation>(1);
+
+    @Override
+    public void initialize(PipeMapRed pmr) throws IOException
+    {
+        super.initialize(pmr);
+
+        // set up decoding around the DataInput (hmm) provided by streaming
+        InputStream in;
+        if (pmr.getClientInput() instanceof InputStream)
+            // let's hope this is the case
+            in = (InputStream)pmr.getClientInput();
+        else
+            // ...because this is relatively slow
+            in = new FromDataInputStream(pmr.getClientInput());
+        decoder = DecoderFactory.defaultFactory().createBinaryDecoder(in, 
null);
+        reader = new 
SpecificDatumReader<StreamingMutation>(StreamingMutation.SCHEMA$);
+    }
+    
+    @Override
+    public boolean readKeyValue() throws IOException
+    {
+        try
+        {
+            reader.read(entry, decoder);
+        }
+        catch (EOFException e)
+        {
+            return false;
+        }
+        mutations.clear();
+        mutations.add(entry.mutation);
+        return true;
+    }
+    
+    @Override
+    public ByteBuffer getCurrentKey() throws IOException
+    {
+        return entry.key;
+    }
+    
+    @Override
+    public List<Mutation> getCurrentValue() throws IOException
+    {
+        return mutations;
+    }
+
+    @Override
+    public String getLastOutput()
+    {
+        return entry.toString();
+    }
+    
+    /**
+     * Wraps a DataInput to extend InputStream. The exception handling in 
read() is likely to be ridiculous slow.
+     */
+    private static final class FromDataInputStream extends InputStream
+    {
+        private final DataInput in;
+
+        public FromDataInputStream(DataInput in)
+        {
+            this.in = in;
+        }
+
+        @Override
+        public boolean markSupported()
+        {
+            return false;
+        }
+
+        @Override
+        public int read() throws IOException
+        {
+            try
+            {
+                return in.readUnsignedByte();
+            }
+            catch (EOFException e)
+            {
+                return -1;
+            }
+        }
+
+        @Override
+        public long skip(long n) throws IOException
+        {
+            long skipped = 0;
+            while (n > 0)
+            {
+                // skip in batches up to max_int in size
+                int skip = (int)Math.min(Integer.MAX_VALUE, n);
+                skipped += in.skipBytes(skip);
+                n -= skip;
+            }
+            return skipped;
+        }
+    }
+}

Added: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java?rev=993550&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/streaming/AvroResolver.java
 Tue Sep  7 22:27:06 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.        See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.        The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.        You may obtain a copy of the License at
+ * 
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.        See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.hadoop.streaming;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.streaming.io.IdentifierResolver;
+
+/**
+ * Resolves AVRO_ID to the appropriate OutputReader and K/V classes for 
Cassandra output.
+ *
+ * TODO: usage explanation
+ */
+public class AvroResolver extends IdentifierResolver
+{
+    public static final String AVRO_ID = "cassandra_avro_output";
+
+    @Override
+    public void resolve(String identifier)
+    {
+        if (!identifier.equalsIgnoreCase(AVRO_ID))
+        {
+            super.resolve(identifier);
+            return;
+        }
+
+        setInputWriterClass(null);
+        setOutputReaderClass(AvroOutputReader.class);
+        setOutputKeyClass(ByteBuffer.class);
+        setOutputValueClass(List.class);
+    }
+}


Reply via email to