Author: omalley
Date: Wed Jul 2 14:56:48 2008
New Revision: 673517
URL: http://svn.apache.org/viewvc?rev=673517&view=rev
Log:
HADOOP-3402. Add terasort example program.
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/package.html
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=673517&r1=673516&r2=673517&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jul 2 14:56:48 2008
@@ -48,6 +48,8 @@
HADOOP-3587. Add a unit test for the contrib/data_join framework.
(cdouglas)
+ HADOOP-3402. Add terasort example program (omalley)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=673517&r1=673516&r2=673517&view=diff
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
(original)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
Wed Jul 2 14:56:48 2008
@@ -19,6 +19,9 @@
package org.apache.hadoop.examples;
import org.apache.hadoop.examples.dancing.DistributedPentomino;
import org.apache.hadoop.examples.dancing.Sudoku;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraValidate;
import org.apache.hadoop.util.ProgramDriver;
/**
@@ -50,6 +53,9 @@
pgd.addClass("sleep", SleepJob.class, "A job that sleeps at each map and
reduce task.");
pgd.addClass("join", Join.class, "A job that effects a join over sorted,
equally partitioned datasets");
pgd.addClass("multifilewc", MultiFileWordCount.class, "A job that counts
words from several files.");
+ pgd.addClass("teragen", TeraGen.class, "Generate data for the terasort");
+ pgd.addClass("terasort", TeraSort.class, "Run the terasort");
+ pgd.addClass("teravalidate", TeraValidate.class, "Checking results of
terasort");
pgd.driver(argv);
}
catch(Throwable e){
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraGen.java
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate the official terasort input data set.
+ * The user specifies the number of rows and the output directory and this
+ * class runs a map/reduce program to generate the data.
+ * The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
+ * <li>The keys are random characters from the set ' ' .. '~'.
+ * <li>The rowid is the right justified row id as a int.
+ * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * </ul>
+ *
+ * <p>
+ * To run the program:
+ * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
+ */
+public class TeraGen extends Configured implements Tool {
+
+ /**
+ * An input format that assigns ranges of longs to each mapper.
+ */
+ static class RangeInputFormat
+ implements InputFormat<LongWritable, NullWritable> {
+
+ /**
+ * An input split consisting of a range on numbers.
+ */
+ static class RangeInputSplit implements InputSplit {
+ long firstRow;
+ long rowCount;
+
+ public RangeInputSplit() { }
+
+ public RangeInputSplit(long offset, long length) {
+ firstRow = offset;
+ rowCount = length;
+ }
+
+ public long getLength() throws IOException {
+ return 0;
+ }
+
+ public String[] getLocations() throws IOException {
+ return new String[]{};
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ firstRow = WritableUtils.readVLong(in);
+ rowCount = WritableUtils.readVLong(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, firstRow);
+ WritableUtils.writeVLong(out, rowCount);
+ }
+ }
+
+ /**
+ * A record reader that will generate a range of numbers.
+ */
+ static class RangeRecordReader
+ implements RecordReader<LongWritable, NullWritable> {
+ long startRow;
+ long finishedRows;
+ long totalRows;
+
+ public RangeRecordReader(RangeInputSplit split) {
+ startRow = split.firstRow;
+ finishedRows = 0;
+ totalRows = split.rowCount;
+ }
+
+ public void close() throws IOException {
+ // NOTHING
+ }
+
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ public NullWritable createValue() {
+ return NullWritable.get();
+ }
+
+ public long getPos() throws IOException {
+ return finishedRows;
+ }
+
+ public float getProgress() throws IOException {
+ return finishedRows / (float) totalRows;
+ }
+
+ public boolean next(LongWritable key,
+ NullWritable value) {
+ if (finishedRows < totalRows) {
+ key.set(startRow + finishedRows);
+ finishedRows += 1;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ }
+
+ public RecordReader<LongWritable, NullWritable>
+ getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+ return new RangeRecordReader((RangeInputSplit) split);
+ }
+
+ /**
+ * Create the desired number of splits, dividing the number of rows
+ * between the mappers.
+ */
+ public InputSplit[] getSplits(JobConf job,
+ int numSplits) {
+ long totalRows = getNumberOfRows(job);
+ long rowsPerSplit = totalRows / numSplits;
+ System.out.println("Generating " + totalRows + " using " + numSplits +
+ " maps with step of " + rowsPerSplit);
+ InputSplit[] splits = new InputSplit[numSplits];
+ long currentRow = 0;
+ for(int split=0; split < numSplits-1; ++split) {
+ splits[split] = new RangeInputSplit(currentRow, rowsPerSplit);
+ currentRow += rowsPerSplit;
+ }
+ splits[numSplits-1] = new RangeInputSplit(currentRow,
+ totalRows - currentRow);
+ return splits;
+ }
+
+ public void validateInput(JobConf job) throws IOException {
+ // NOTHING
+ }
+ }
+
+ static long getNumberOfRows(JobConf job) {
+ return job.getLong("terasort.num-rows", 0);
+ }
+
+ static void setNumberOfRows(JobConf job, long numRows) {
+ job.setLong("terasort.num-rows", numRows);
+ }
+
+ static class RandomGenerator {
+ private long seed = 0;
+ private static final long mask32 = (1l<<32) - 1;
+ /**
+ * The number of iterations separating the precomputed seeds.
+ */
+ private static final int seedSkip = 128 * 1024 * 1024;
+ /**
+ * The precomputed seed values after every seedSkip iterations.
+ * There should be enough values so that a 2**32 iterations are
+ * covered.
+ */
+ private static final long[] seeds = new long[]{0L,
+ 4160749568L,
+ 4026531840L,
+ 3892314112L,
+ 3758096384L,
+ 3623878656L,
+ 3489660928L,
+ 3355443200L,
+ 3221225472L,
+ 3087007744L,
+ 2952790016L,
+ 2818572288L,
+ 2684354560L,
+ 2550136832L,
+ 2415919104L,
+ 2281701376L,
+ 2147483648L,
+ 2013265920L,
+ 1879048192L,
+ 1744830464L,
+ 1610612736L,
+ 1476395008L,
+ 1342177280L,
+ 1207959552L,
+ 1073741824L,
+ 939524096L,
+ 805306368L,
+ 671088640L,
+ 536870912L,
+ 402653184L,
+ 268435456L,
+ 134217728L,
+ };
+
+ /**
+ * Start the random number generator on the given iteration.
+ * @param initalIteration the iteration number to start on
+ */
+ RandomGenerator(long initalIteration) {
+ int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
+ seed = seeds[baseIndex];
+ for(int i=0; i < initalIteration % seedSkip; ++i) {
+ next();
+ }
+ }
+
+ RandomGenerator() {
+ this(0);
+ }
+
+ long next() {
+ seed = (seed * 3141592621l + 663896637) & mask32;
+ return seed;
+ }
+ }
+
+ /**
+ * The Mapper class that given a row number, will generate the appropriate
+ * output line.
+ */
+ public static class SortGenMapper extends MapReduceBase
+ implements Mapper<LongWritable, NullWritable, Text, Text> {
+
+ private Text key = new Text();
+ private Text value = new Text();
+ private RandomGenerator rand;
+ private byte[] keyBytes = new byte[12];
+ private byte[] spaces = " ".getBytes();
+ private byte[][] filler = new byte[26][];
+ {
+ for(int i=0; i < 26; ++i) {
+ filler[i] = new byte[10];
+ for(int j=0; j<10; ++j) {
+ filler[i][j] = (byte) ('A' + i);
+ }
+ }
+ }
+
+ /**
+ * Add a random key to the text
+ * @param rowId
+ */
+ private void addKey() {
+ for(int i=0; i<3; i++) {
+ long temp = rand.next() / 52;
+ keyBytes[3 + 4*i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[2 + 4*i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[1 + 4*i] = (byte) (' ' + (temp % 95));
+ temp /= 95;
+ keyBytes[4*i] = (byte) (' ' + (temp % 95));
+ }
+ key.set(keyBytes, 0, 10);
+ }
+
+ /**
+ * Add the rowid to the row.
+ * @param rowId
+ */
+ private void addRowId(long rowId) {
+ byte[] rowid = Integer.toString((int) rowId).getBytes();
+ int padSpace = 10 - rowid.length;
+ if (padSpace > 0) {
+ value.append(spaces, 0, 10 - rowid.length);
+ }
+ value.append(rowid, 0, Math.min(rowid.length, 10));
+ }
+
+ /**
+ * Add the required filler bytes. Each row consists of 7 blocks of
+ * 10 characters and 1 block of 8 characters.
+ * @param rowId the current row number
+ */
+ private void addFiller(long rowId) {
+ int base = (int) ((rowId * 8) % 26);
+ for(int i=0; i<7; ++i) {
+ value.append(filler[(base+i) % 26], 0, 10);
+ }
+ value.append(filler[(base+7) % 26], 0, 8);
+ }
+
+ public void map(LongWritable row, NullWritable ignored,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ long rowId = row.get();
+ if (rand == null) {
+ // we use 3 random numbers per a row
+ rand = new RandomGenerator(rowId*3);
+ }
+ addKey();
+ value.clear();
+ addRowId(rowId);
+ addFiller(rowId);
+ output.collect(key, value);
+ }
+
+ }
+
+ /**
+ * @param args the cli arguments
+ */
+ public int run(String[] args) throws IOException {
+ JobConf job = (JobConf) getConf();
+ setNumberOfRows(job, Long.parseLong(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setJobName("TeraGen");
+ job.setJarByClass(TeraGen.class);
+ job.setMapperClass(SortGenMapper.class);
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setInputFormat(RangeInputFormat.class);
+ job.setOutputFormat(TeraOutputFormat.class);
+ JobClient.runJob(job);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new JobConf(), new TeraGen(), args);
+ System.exit(res);
+ }
+
+}
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraInputFormat.java
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.QuickSort;
+
+/**
+ * An input format that reads the first 10 characters of each line as the key
+ * and the rest of the line as the value. Both key and value are represented
+ * as Text.
+ */
+public class TeraInputFormat extends FileInputFormat<Text,Text> {
+
+ static final String PARTITION_FILENAME = "_partition.lst";
+ static final String SAMPLE_SIZE = "terasort.partitions.sample";
+ private static JobConf lastConf = null;
+ private static InputSplit[] lastResult = null;
+
+ static class TextSampler implements IndexedSortable {
+ private ArrayList<Text> records = new ArrayList<Text>();
+
+ public int compare(int i, int j) {
+ Text left = records.get(i);
+ Text right = records.get(j);
+ return left.compareTo(right);
+ }
+
+ public void swap(int i, int j) {
+ Text left = records.get(i);
+ Text right = records.get(j);
+ records.set(j, left);
+ records.set(i, right);
+ }
+
+ public void addKey(Text key) {
+ records.add(new Text(key));
+ }
+
+ /**
+ * Find the split points for a given sample. The sample keys are sorted
+ * and down sampled to find even split points for the partitions. The
+ * returned keys should be the start of their respective partitions.
+ * @param numPartitions the desired number of partitions
+ * @return an array of size numPartitions - 1 that holds the split points
+ */
+ Text[] createPartitions(int numPartitions) {
+ int numRecords = records.size();
+ System.out.println("Making " + numPartitions + " from " + numRecords +
+ " records");
+ if (numPartitions > numRecords) {
+ throw new IllegalArgumentException
+ ("Requested more partitions than input keys (" + numPartitions +
+ " > " + numRecords + ")");
+ }
+ new QuickSort().sort(this, 0, records.size());
+ float stepSize = numRecords / (float) numPartitions;
+ System.out.println("Step size is " + stepSize);
+ Text[] result = new Text[numPartitions-1];
+ for(int i=1; i < numPartitions; ++i) {
+ result[i-1] = records.get(Math.round(stepSize * i));
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Use the input splits to take samples of the input and generate sample
+ * keys. By default reads 100,000 keys from 10 locations in the input, sorts
+ * them and picks N-1 keys to generate N equally sized partitions.
+ * @param conf the job to sample
+ * @param partFile where to write the output file to
+ * @throws IOException if something goes wrong
+ */
+ public static void writePartitionFile(JobConf conf,
+ Path partFile) throws IOException {
+ TeraInputFormat inFormat = new TeraInputFormat();
+ TextSampler sampler = new TextSampler();
+ Text key = new Text();
+ Text value = new Text();
+ int partitions = conf.getNumReduceTasks();
+ long sampleSize = conf.getLong(SAMPLE_SIZE, 100000);
+ InputSplit[] splits = inFormat.getSplits(conf, conf.getNumMapTasks());
+ int samples = Math.min(10, splits.length);
+ long recordsPerSample = sampleSize / samples;
+ int sampleStep = splits.length / samples;
+ long records = 0;
+ // take N samples from different parts of the input
+ for(int i=0; i < samples; ++i) {
+ RecordReader<Text,Text> reader =
+ inFormat.getRecordReader(splits[sampleStep * i], conf, null);
+ while (reader.next(key, value)) {
+ sampler.addKey(key);
+ records += 1;
+ if ((i+1) * recordsPerSample <= records) {
+ break;
+ }
+ }
+ }
+ FileSystem outFs = partFile.getFileSystem(conf);
+ if (outFs.exists(partFile)) {
+ outFs.delete(partFile, false);
+ }
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(outFs, conf, partFile, Text.class,
+ NullWritable.class);
+ NullWritable nullValue = NullWritable.get();
+ for(Text split : sampler.createPartitions(partitions)) {
+ writer.append(split, nullValue);
+ }
+ writer.close();
+ }
+
+ static class TeraRecordReader implements RecordReader<Text,Text> {
+ private LineRecordReader in;
+ private LongWritable junk = new LongWritable();
+ private Text line = new Text();
+ private static int KEY_LENGTH = 10;
+
+ public TeraRecordReader(Configuration job,
+ FileSplit split) throws IOException {
+ in = new LineRecordReader(job, split);
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
+ public float getProgress() throws IOException {
+ return in.getProgress();
+ }
+
+ public boolean next(Text key, Text value) throws IOException {
+ if (in.next(junk, line)) {
+ if (line.getLength() < KEY_LENGTH) {
+ key.set(line);
+ value.clear();
+ } else {
+ byte[] bytes = line.getBytes();
+ key.set(bytes, 0, KEY_LENGTH);
+ value.set(bytes, KEY_LENGTH, line.getLength() - KEY_LENGTH);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public RecordReader<Text, Text>
+ getRecordReader(InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException {
+ return new TeraRecordReader(job, (FileSplit) split);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf conf, int splits) throws IOException {
+ if (conf == lastConf) {
+ return lastResult;
+ }
+ lastConf = conf;
+ lastResult = super.getSplits(conf, splits);
+ return lastResult;
+ }
+}
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraOutputFormat.java
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A streamlined text output format that writes key, value, and "\r\n".
+ */
+public class TeraOutputFormat extends TextOutputFormat<Text,Text> {
+ static final String FINAL_SYNC_ATTRIBUTE = "terasort.final.sync";
+
+ /**
+ * Set the requirement for a final sync before the stream is closed.
+ */
+ public static void setFinalSync(JobConf conf, boolean newValue) {
+ conf.setBoolean(FINAL_SYNC_ATTRIBUTE, newValue);
+ }
+
+ /**
+ * Does the user want a final sync at close?
+ */
+ public static boolean getFinalSync(JobConf conf) {
+ return conf.getBoolean(FINAL_SYNC_ATTRIBUTE, false);
+ }
+
+ static class TeraRecordWriter extends LineRecordWriter<Text,Text> {
+ private static final byte[] newLine = "\r\n".getBytes();
+ private boolean finalSync = false;
+
+ public TeraRecordWriter(DataOutputStream out,
+ JobConf conf) {
+ super(out);
+ finalSync = getFinalSync(conf);
+ }
+
+ public synchronized void write(Text key,
+ Text value) throws IOException {
+ out.write(key.getBytes(), 0, key.getLength());
+ out.write(value.getBytes(), 0, value.getLength());
+ out.write(newLine, 0, newLine.length);
+ }
+
+ public void close() throws IOException {
+ if (finalSync) {
+ ((FSDataOutputStream) out).sync();
+ }
+ super.close(null);
+ }
+ }
+
+ public RecordWriter<Text,Text> getRecordWriter(FileSystem ignored,
+ JobConf job,
+ String name,
+ Progressable progress
+ ) throws IOException {
+ Path dir = getWorkOutputPath(job);
+ FileSystem fs = dir.getFileSystem(job);
+ FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+ return new TeraRecordWriter(fileOut, job);
+ }
+}
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generates the sampled split points, launches the job, and waits for it to
+ * finish.
+ * <p>
+ * To run the program:
+ * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
+ */
+public class TeraSort extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(TeraSort.class);
+
+ /**
+ * A partitioner that splits text keys into roughly equal partitions
+ * in a global sorted order.
+ */
+ static class TotalOrderPartitioner implements Partitioner<Text,Text>{
+ private TrieNode trie;
+ private Text[] splitPoints;
+
+ /**
+ * A generic trie node
+ */
+ static abstract class TrieNode {
+ private int level;
+ TrieNode(int level) {
+ this.level = level;
+ }
+ abstract int findPartition(Text key);
+ abstract void print(PrintStream strm) throws IOException;
+ int getLevel() {
+ return level;
+ }
+ }
+
+ /**
+ * An inner trie node that contains 256 children based on the next
+ * character.
+ */
+ static class InnerTrieNode extends TrieNode {
+ private TrieNode[] child = new TrieNode[256];
+
+ InnerTrieNode(int level) {
+ super(level);
+ }
+ int findPartition(Text key) {
+ int level = getLevel();
+ if (key.getLength() <= level) {
+ return child[0].findPartition(key);
+ }
+ return child[key.getBytes()[level]].findPartition(key);
+ }
+ void setChild(int idx, TrieNode child) {
+ this.child[idx] = child;
+ }
+ void print(PrintStream strm) throws IOException {
+ for(int ch=0; ch < 255; ++ch) {
+ for(int i = 0; i < 2*getLevel(); ++i) {
+ strm.print(' ');
+ }
+ strm.print(ch);
+ strm.println(" ->");
+ if (child[ch] != null) {
+ child[ch].print(strm);
+ }
+ }
+ }
+ }
+
+ /**
+ * A leaf trie node that does string compares to figure out where the given
+ * key belongs between lower..upper.
+ */
+ static class LeafTrieNode extends TrieNode {
+ int lower;
+ int upper;
+ Text[] splitPoints;
+ LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
+ super(level);
+ this.splitPoints = splitPoints;
+ this.lower = lower;
+ this.upper = upper;
+ }
+ int findPartition(Text key) {
+ for(int i=lower; i<upper; ++i) {
+ if (splitPoints[i].compareTo(key) >= 0) {
+ return i;
+ }
+ }
+ return upper;
+ }
+ void print(PrintStream strm) throws IOException {
+ for(int i = 0; i < 2*getLevel(); ++i) {
+ strm.print(' ');
+ }
+ strm.print(lower);
+ strm.print(", ");
+ strm.println(upper);
+ }
+ }
+
+
+ /**
+ * Read the cut points from the given sequence file.
+ * @param fs the file system
+ * @param p the path to read
+ * @param job the job config
+ * @return the strings to split the partitions on
+ * @throws IOException
+ */
+ private static Text[] readPartitions(FileSystem fs, Path p,
+ JobConf job) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
+ List<Text> parts = new ArrayList<Text>();
+ Text key = new Text();
+ NullWritable value = NullWritable.get();
+ while (reader.next(key, value)) {
+ parts.add(key);
+ key = new Text();
+ }
+ reader.close();
+ return parts.toArray(new Text[parts.size()]);
+ }
+
+ /**
+ * Given a sorted set of cut points, build a trie that will find the
correct
+ * partition quickly.
+ * @param splits the list of cut points
+ * @param lower the lower bound of partitions 0..numPartitions-1
+ * @param upper the upper bound of partitions 0..numPartitions-1
+ * @param prefix the prefix that we have already checked against
+ * @param maxDepth the maximum depth we will build a trie for
+ * @return the trie node that will divide the splits correctly
+ */
+ private static TrieNode buildTrie(Text[] splits, int lower, int upper,
+ Text prefix, int maxDepth) {
+ int depth = prefix.getLength();
+ if (depth >= maxDepth || lower == upper) {
+ return new LeafTrieNode(depth, splits, lower, upper);
+ }
+ InnerTrieNode result = new InnerTrieNode(depth);
+ Text trial = new Text(prefix);
+ // append an extra byte on to the prefix
+ trial.append(new byte[1], 0, 1);
+ int currentBound = lower;
+ for(int ch = 0; ch < 255; ++ch) {
+ trial.getBytes()[depth] = (byte) (ch + 1);
+ lower = currentBound;
+ while (currentBound < upper) {
+ if (splits[currentBound].compareTo(trial) >= 0) {
+ break;
+ }
+ currentBound += 1;
+ }
+ trial.getBytes()[depth] = (byte) ch;
+ result.child[ch] = buildTrie(splits, lower, currentBound, trial,
+ maxDepth);
+ }
+ // pick up the rest
+ trial.getBytes()[depth] = 127;
+ result.child[255] = buildTrie(splits, currentBound, upper, trial,
+ maxDepth);
+ return result;
+ }
+
+ public void configure(JobConf job) {
+ try {
+ FileSystem fs = FileSystem.getLocal(job);
+ Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
+ splitPoints = readPartitions(fs, partFile, job);
+ trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
+ } catch (IOException ie) {
+ throw new IllegalArgumentException("can't read paritions file", ie);
+ }
+ }
+
+ public TotalOrderPartitioner() {
+ }
+
+ public int getPartition(Text key, Text value, int numPartitions) {
+ return trie.findPartition(key);
+ }
+
+ }
+
+ public int run(String[] args) throws Exception {
+ LOG.info("starting");
+ JobConf job = (JobConf) getConf();
+ Path inputDir = new Path(args[0]);
+ inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
+ Path partitionFile = new Path(inputDir,
TeraInputFormat.PARTITION_FILENAME);
+ URI partitionUri = new URI(partitionFile.toString() +
+ "#" + TeraInputFormat.PARTITION_FILENAME);
+ TeraInputFormat.setInputPaths(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setJobName("TeraSort");
+ job.setJarByClass(TeraSort.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setInputFormat(TeraInputFormat.class);
+ job.setOutputFormat(TeraOutputFormat.class);
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ TeraInputFormat.writePartitionFile(job, partitionFile);
+ DistributedCache.addCacheFile(partitionUri, job);
+ DistributedCache.createSymlink(job);
+ job.setInt("dfs.replication", 1);
+ TeraOutputFormat.setFinalSync(job, true);
+ JobClient.runJob(job);
+ LOG.info("done");
+ return 0;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
+ System.exit(res);
+ }
+
+}
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraValidate.java
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.examples.terasort;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Generate 1 mapper per a file that checks to make sure the keys
+ * are sorted within each file. The mapper also generates
+ * "$file:begin", first key and "$file:end", last key. The reduce verifies that
+ * all of the start/end items are in order.
+ * Any output from the reduce is problem report.
+ * <p>
+ * To run the program:
+ * <b>bin/hadoop jar hadoop-*-examples.jar teravalidate out-dir report-dir</b>
+ * <p>
+ * If there is any output, something is wrong and the output of the reduce
+ * will have the problem report.
+ */
+public class TeraValidate extends Configured implements Tool {
+ private static final Text error = new Text("error");
+
+ static class ValidateMapper extends MapReduceBase
+ implements Mapper<Text,Text,Text,Text> {
+ private Text lastKey;
+ private OutputCollector<Text,Text> output;
+ private String filename;
+
+ /**
+ * Get the final part of the input name
+ * @param split the input split
+ * @return the "part-00000" for the input
+ */
+ private String getFilename(FileSplit split) {
+ return split.getPath().getName();
+ }
+
+ public void map(Text key, Text value, OutputCollector<Text,Text> output,
+ Reporter reporter) throws IOException {
+ if (lastKey == null) {
+ filename = getFilename((FileSplit) reporter.getInputSplit());
+ output.collect(new Text(filename + ":begin"), key);
+ lastKey = new Text();
+ this.output = output;
+ } else {
+ if (key.compareTo(lastKey) < 0) {
+ output.collect(error, new Text("misorder in " + filename +
+ " last: '" + lastKey +
+ "' current: '" + key + "'"));
+ }
+ }
+ lastKey.set(key);
+ }
+
+ public void close() throws IOException {
+ if (lastKey != null) {
+ output.collect(new Text(filename + ":end"), lastKey);
+ }
+ }
+ }
+
+ /**
+ * Check the boundaries between the output files by making sure that the
+ * boundary keys are always increasing.
+ * Also passes any error reports along intact.
+ */
+ static class ValidateReducer extends MapReduceBase
+ implements Reducer<Text,Text,Text,Text> {
+ private boolean firstKey = true;
+ private Text lastKey = new Text();
+ private Text lastValue = new Text();
+ public void reduce(Text key, Iterator<Text> values,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ if (error.equals(key)) {
+ while(values.hasNext()) {
+ output.collect(key, values.next());
+ }
+ } else {
+ Text value = values.next();
+ if (firstKey) {
+ firstKey = false;
+ } else {
+ if (value.compareTo(lastValue) < 0) {
+ output.collect(error,
+ new Text("misordered keys last: " +
+ lastKey + " '" + lastValue +
+ "' current: " + key + " '" + value + "'"));
+ }
+ }
+ lastKey.set(key);
+ lastValue.set(value);
+ }
+ }
+
+ }
+
+ public int run(String[] args) throws Exception {
+ JobConf job = (JobConf) getConf();
+ TeraInputFormat.setInputPaths(job, new Path(args[0]));
+ FileOutputFormat.setOutputPath(job, new Path(args[1]));
+ job.setJobName("TeraValidate");
+ job.setJarByClass(TeraValidate.class);
+ job.setMapperClass(ValidateMapper.class);
+ job.setReducerClass(ValidateReducer.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ // force a single reducer
+ job.setNumReduceTasks(1);
+ // force a single split
+ job.setLong("mapred.min.split.size", Long.MAX_VALUE);
+ job.setInputFormat(TeraInputFormat.class);
+ JobClient.runJob(job);
+ return 0;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new JobConf(), new TeraValidate(), args);
+ System.exit(res);
+ }
+
+}
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/job_history_summary.py
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+
+pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
+counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
+
+def parse(tail):
+ result = {}
+ for n,v in re.findall(pat, tail):
+ result[n] = v
+ return result
+
+mapStartTime = {}
+mapEndTime = {}
+reduceStartTime = {}
+reduceShuffleTime = {}
+reduceSortTime = {}
+reduceEndTime = {}
+reduceBytes = {}
+
+for line in sys.stdin:
+ words = line.split(" ",1)
+ event = words[0]
+ attrs = parse(words[1])
+ if event == 'MapAttempt':
+ if attrs.has_key("START_TIME"):
+ mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
+ elif attrs.has_key("FINISH_TIME"):
+ mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+ elif event == 'ReduceAttempt':
+ if attrs.has_key("START_TIME"):
+ reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
+ elif attrs.has_key("FINISH_TIME"):
+ reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
+ reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
+ reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
+ elif event == 'Task':
+ if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
+ for n,v in re.findall(counterPat, attrs["COUNTERS"]):
+ if n == "File Systems.HDFS bytes written":
+ reduceBytes[attrs["TASKID"]] = int(v)
+
+runningMaps = {}
+shufflingReduces = {}
+sortingReduces = {}
+runningReduces = {}
+startTime = min(reduce(min, mapStartTime.values()),
+ reduce(min, reduceStartTime.values()))
+endTime = max(reduce(max, mapEndTime.values()),
+ reduce(max, reduceEndTime.values()))
+
+reduces = reduceBytes.keys()
+reduces.sort()
+
+print "Name reduce-output-bytes shuffle-finish reduce-finish"
+for r in reduces:
+ print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
+ print reduceEndTime[r] - startTime
+
+print
+
+for t in range(startTime, endTime):
+ runningMaps[t] = 0
+ shufflingReduces[t] = 0
+ sortingReduces[t] = 0
+ runningReduces[t] = 0
+
+for map in mapStartTime.keys():
+ for t in range(mapStartTime[map], mapEndTime[map]):
+ runningMaps[t] += 1
+for reduce in reduceStartTime.keys():
+ for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
+ shufflingReduces[t] += 1
+ for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
+ sortingReduces[t] += 1
+ for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
+ runningReduces[t] += 1
+
+print "time maps shuffle merge reduce"
+for t in range(startTime, endTime):
+ print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t],
+ print runningReduces[t]
Added:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/package.html
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/package.html?rev=673517&view=auto
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/package.html
(added)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/terasort/package.html
Wed Jul 2 14:56:48 2008
@@ -0,0 +1,113 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+This package consists of 3 map/reduce applications for Hadoop to
+compete in the annual <a
+href="http://www.hpl.hp.com/hosted/sortbenchmark" target="_top">terabyte
sort</a>
+competition.
+
+<ul>
+<li><b>TeraGen</b> is a map/reduce program to generate the data.
+<li><b>TeraSort</b> samples the input data and uses map/reduce to
+ sort the data into a total order.
+<li><b>TeraValidate</b> is a map/reduce program that validates the
+ output is sorted.
+</ul>
+
+<p>
+
+<b>TeraGen</b> generates output data that is byte for byte
+equivalent to the C version including the newlines and specific
+keys. It divides the desired number of rows by the desired number of
+tasks and assigns ranges of rows to each map. The map jumps the random
+number generator to the correct value for the first row and generates
+the following rows.
+
+<p>
+
+<b>TeraSort</b> is a standard map/reduce sort, except for a custom
+partitioner that uses a sorted list of <i>N-1</i> sampled keys that define
+the key range for each reduce. In particular, all keys such that
+<i>sample[i-1] <= key < sample[i]</i> are sent to reduce
+<i>i</i>. This guarantees that the output of reduce <i>i</i> are all
+less than the output of reduce <i>i+1</i>. To speed up the
+partitioning, the partitioner builds a two level trie that quickly
+indexes into the list of sample keys based on the first two bytes of
+the key. TeraSort generates the sample keys by sampling the input
+before the job is submitted and writing the list of keys into HDFS.
+The input and output format, which are used by all 3 applications,
+read and write the text files in the right format. The output of the
+reduce has replication set to 1, instead of the default 3, because the
+contest does not require the output data be replicated on to multiple
+nodes.
+
+<p>
+
+<b>TeraValidate</b> ensures that the output is globally sorted. It
+creates one map per a file in the output directory and each map ensures that
+each key is less than or equal to the previous one. The map also generates
+records with the first and last keys of the file and the reduce
+ensures that the first key of file <i>i</i> is greater that the last key of
+file <i>i-1</i>. Any problems are reported as output of the reduce with the
+keys that are out of order.
+
+<p>
+
+In May 2008, Owen O'Malley ran this code on a 910 node cluster and
+sorted the 10 billion records (1 TB) in 209 seconds (3.48 minutes) to
+win the annual general purpose (daytona)
+<a href="http://www.hpl.hp.com/hosted/sortbenchmark/">terabyte sort
+benchmark</a>.
+
+<p>
+
+The cluster statistics were:
+<ul>
+<li> 910 nodes
+<li> 4 dual core Xeons @ 2.0ghz per a node
+<li> 4 SATA disks per a node
+<li> 8G RAM per a node
+<li> 1 gigabit ethernet on each node
+<li> 40 nodes per a rack
+<li> 8 gigabit ethernet uplinks from each rack to the core
+<li> Red Hat Enterprise Linux Server Release 5.1 (kernel 2.6.18)
+<li> Sun Java JDK 1.6.0_05-b13
+</ul>
+
+<p>
+
+The test was on Hadoop trunk (pre-0.18) patched with <a
+href="http://issues.apache.org/jira/browse/HADOOP-3443">HADOOP-3443</a>
+and <a
+href="http://issues.apache.org/jira/browse/HADOOP-3446">HADOOP-3446</a>,
+which were required to remove intermediate writes to disk.
+TeraGen used
+1800 tasks to generate a total of 10 billion rows in HDFS, with a
+block size of 1024 MB.
+TeraSort was configured with 1800 maps and 1800 reduces, and
+<i>io.sort.mb</i>,
+<i>io.sort.factor</i>, <i>fs.inmemory.size.mb</i>, and task heap size
+sufficient that transient data was never spilled to disk, other at the
+end of the map. The sampler looked at 100,000 keys to determine the
+reduce boundaries, which lead to imperfect balancing with reduce
+outputs ranging from 337 MB to 872 MB.
+
+</body>
+</html>
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java?rev=673517&r1=673516&r2=673517&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
Wed Jul 2 14:56:48 2008
@@ -47,7 +47,7 @@
}
}
- private DataOutputStream out;
+ protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {