Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=679845&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Fri Jul 25 08:57:41 2008 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.*; + +/** An [EMAIL PROTECTED] OutputFormat} that writes plain text files. */ +public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { + + protected static class LineRecordWriter<K, V> + implements RecordWriter<K, V> { + private static final String utf8 = "UTF-8"; + private static final byte[] newline; + static { + try { + newline = "\n".getBytes(utf8); + } catch (UnsupportedEncodingException uee) { + throw new IllegalArgumentException("can't find " + utf8 + " encoding"); + } + } + + protected DataOutputStream out; + private final byte[] keyValueSeparator; + + public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { + this.out = out; + try { + this.keyValueSeparator = keyValueSeparator.getBytes(utf8); + } catch (UnsupportedEncodingException uee) { + throw new IllegalArgumentException("can't find " + utf8 + " encoding"); + } + } + + public LineRecordWriter(DataOutputStream out) { + this(out, "\t"); + } + + /** + * Write the object to the byte stream, handling Text as a special + * case. + * @param o the object to print + * @throws IOException if the write throws, we pass it on + */ + private void writeObject(Object o) throws IOException { + if (o instanceof Text) { + Text to = (Text) o; + out.write(to.getBytes(), 0, to.getLength()); + } else { + out.write(o.toString().getBytes(utf8)); + } + } + + public synchronized void write(K key, V value) + throws IOException { + + boolean nullKey = key == null || key instanceof NullWritable; + boolean nullValue = value == null || value instanceof NullWritable; + if (nullKey && nullValue) { + return; + } + if (!nullKey) { + writeObject(key); + } + if (!(nullKey || nullValue)) { + out.write(keyValueSeparator); + } + if (!nullValue) { + writeObject(value); + } + out.write(newline); + } + + public synchronized + void close(TaskAttemptContext context) throws IOException { + out.close(); + } + } + + public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) + throws IOException { + Configuration job = context.getConfiguration(); + boolean isCompressed = getCompressOutput(job); + String keyValueSeparator = job.get("mapred.textoutputformat.separator", + "\t"); + Path file = FileOutputFormat.getTaskOutputPath(context); + if (!isCompressed) { + FileSystem fs = file.getFileSystem(job); + FSDataOutputStream fileOut = fs.create(file, context); + return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); + } else { + Class<? extends CompressionCodec> codecClass = + getOutputCompressorClass(job, GzipCodec.class); + // create the named codec + CompressionCodec codec = (CompressionCodec) + ReflectionUtils.newInstance(codecClass, job); + // build the filename including the extension + file = new Path(file + codec.getDefaultExtension()); + FileSystem fs = file.getFileSystem(job); + FSDataOutputStream fileOut = fs.create(file, context); + return new LineRecordWriter<K, V>(new DataOutputStream + (codec.createOutputStream(fileOut)), + keyValueSeparator); + } + } +} +
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java?rev=679845&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java Fri Jul 25 08:57:41 2008 @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.partition; + +import org.apache.hadoop.mapreduce.Partitioner; + +/** Partition keys by their [EMAIL PROTECTED] Object#hashCode()}. */ +public class HashPartitioner<K, V> implements Partitioner<K, V> { + + /** Use [EMAIL PROTECTED] Object#hashCode()} to partition. */ + public int getPartition(K key, V value, + int numReduceTasks) { + return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; + } + +} Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java?rev=679845&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/IntSumReducer.java Fri Jul 25 08:57:41 2008 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.reduce; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Reducer; + +public class IntSumReducer<Key> extends Reducer<Key,IntWritable, + Key,IntWritable> { + private IntWritable result = new IntWritable(); + + public void reduce(Key key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.collect(key, result); + } + +} \ No newline at end of file Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java?rev=679845&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/reduce/LongSumReducer.java Fri Jul 25 08:57:41 2008 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.reduce; + +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Reducer; + +public class LongSumReducer<KEY> extends Reducer<KEY, LongWritable, + KEY,LongWritable> { + + private LongWritable result = new LongWritable(); + + public void reduce(KEY key, Iterable<LongWritable> values, + Context context) throws IOException, InterruptedException { + long sum = 0; + for (LongWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.collect(key, result); + } + +} \ No newline at end of file
