Added: avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java?rev=954998&view=auto ============================================================================== --- avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java (added) +++ avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java Tue Jun 15 18:21:18 2010 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.avro.mapred.tether; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.avro.util.Utf8; +import org.apache.avro.mapred.WordCount; + +/** Example Java tethered mapreduce executable. Implements map and reduce + * functions for word count. */ +public class WordCountTask extends TetherTask<Utf8,WordCount,WordCount> { + + @Override public void map(Utf8 text, Collector<WordCount> collector) + throws IOException { + StringTokenizer tokens = new StringTokenizer(text.toString()); + while (tokens.hasMoreTokens()) { + WordCount wc = new WordCount(); + wc.word = new Utf8(tokens.nextToken()); + wc.count = 1; + collector.collect(wc); + } + } + + private int sum; + + @Override public void reduce(WordCount wc, Collector<WordCount> c) { + sum += wc.count; + } + + @Override public void reduceFlush(WordCount wc, Collector<WordCount> c) + throws IOException { + wc.count = sum; + c.collect(wc); + sum = 0; + } + + public static void main(String... args) throws Exception { + new TetherTaskRunner(new WordCountTask()).join(); + } + +}
Added: avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr?rev=954998&view=auto ============================================================================== --- avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr (added) +++ avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr Tue Jun 15 18:21:18 2010 @@ -0,0 +1,64 @@ +{"namespace":"org.apache.avro.mapred.tether", + "protocol": "InputProtocol", + "doc": "Transmit inputs to a map or reduce task sub-process.", + + "types": [ + {"name": "TaskType", "type": "enum", "symbols": ["MAP","REDUCE"]} + ], + + "messages": { + + "configure": { + "doc": "Configure the task. Sent before any other message.", + "request": [ + {"name": "taskType", "type": "TaskType", + "doc": "Whether this is a map or reduce task."}, + {"name": "inSchema", "type": "string", + "doc": "The Avro schema for task input data."}, + {"name": "outSchema", "type": "string", + "doc": "The Avro schema for task output data."} + ], + "response": "null", + "one-way": true + }, + + "partitions": { + "doc": "Set the number of map output partitions.", + "request": [ + {"name": "partitions", "type": "int", + "doc": "The number of map output partitions."} + ], + "response": "null", + "one-way": true + }, + + "input": { + "doc": "Send a block of input data to a task.", + "request": [ + {"name": "data", "type": "bytes", + "doc": "A sequence of instances of the declared schema."}, + {"name": "count", "type": "long", + "default": 1, + "doc": "The number of instances in this block."} + ], + "response": "null", + "one-way": true + }, + + "abort": { + "doc": "Called to abort the task.", + "request": [], + "response": "null", + "one-way": true + }, + + "complete": { + "doc": "Called when a task's input is complete.", + "request": [], + "response": "null", + "one-way": true + } + + } + +} Added: avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr URL: http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr?rev=954998&view=auto ============================================================================== --- avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr (added) +++ avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr Tue Jun 15 18:21:18 2010 @@ -0,0 +1,82 @@ +{"namespace":"org.apache.avro.mapred.tether", + "protocol": "OutputProtocol", + "doc": "Transmit outputs from a map or reduce task to parent.", + + "messages": { + + "configure": { + "doc": "Configure task. Sent before any other message.", + "request": [ + {"name": "port", "type": "int", + "doc": "The port to transmit inputs to this task on."} + ], + "response": "null", + "one-way": true + }, + + "output": { + "doc": "Send an output datum.", + "request": [ + {"name": "datum", "type": "bytes", + "doc": "A binary-encoded instance of the declared schema."} + ], + "response": "null", + "one-way": true + }, + + "outputPartitioned": { + "doc": "Send map output datum explicitly naming its partition.", + "request": [ + {"name": "partition", "type": "int", + "doc": "The map output partition for this datum."}, + {"name": "datum", "type": "bytes", + "doc": "A binary-encoded instance of the declared schema."} + ], + "response": "null", + "one-way": true + }, + + "status": { + "doc": "Update the task's status message. Also acts as keepalive.", + "request": [ + {"name": "message", "type": "string", + "doc": "The new status message for the task."} + ], + "response": "null", + "one-way": true + }, + + "count": { + "doc": "Increment a task/job counter.", + "request": [ + {"name": "group", "type": "string", + "doc": "The name of the counter group."}, + {"name": "name", "type": "string", + "doc": "The name of the counter to increment."}, + {"name": "amount", "type": "long", + "doc": "The amount to incrment the counter."} + ], + "response": "null", + "one-way": true + }, + + "fail": { + "doc": "Called by a failing task to abort.", + "request": [ + {"name": "message", "type": "string", + "doc": "The reason for failure."} + ], + "response": "null", + "one-way": true + }, + + "complete": { + "doc": "Called when a task's output has completed without error.", + "request": [], + "response": "null", + "one-way": true + } + + } + +}
