Author: szetszwo
Date: Wed Nov 12 17:41:39 2008
New Revision: 713612
URL: http://svn.apache.org/viewvc?rev=713612&view=rev
Log:
HADOOP-4589. Correct PiEstimator output messages and improve the code
readability. (szetszwo)
Removed:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Nov 12 17:41:39 2008
@@ -142,6 +142,9 @@
HADOOP-4571. Add chukwa conf files to svn:ignore list. (Eric Yang via
szetszwo)
+ HADOOP-4589. Correct PiEstimator output messages and improve the code
+ readability. (szetszwo)
+
Release 0.19.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
(original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
Wed Nov 12 17:41:39 2008
@@ -22,10 +22,10 @@
import java.math.BigDecimal;
import java.util.Iterator;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -48,11 +48,29 @@
/**
* A Map-reduce program to estimate the value of Pi
* using quasi-Monte Carlo method.
+ *
+ * Mapper:
+ * Generate points in a unit square
+ * and then count points inside/outside of the inscribed circle of the
square.
+ *
+ * Reducer:
+ * Accumulate points inside/outside results from the mappers.
+ *
+ * Let numTotal = numInside + numOutside.
+ * The fraction numInside/numTotal is a rational approximation of
+ * the value (Area of the circle)/(Area of the square),
+ * where the area of the inscribed circle is Pi/4
+ * and the area of unit square is 1.
+ * Then, Pi is estimated value to be 4(numInside/numTotal).
*/
public class PiEstimator extends Configured implements Tool {
+ /** tmp directory for input/output */
+ static private final Path TMP_DIR = new Path(
+ PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
/** 2-dimensional Halton sequence {H(i)},
- * where H(i) is a 2-dimensional point and i >= 1 is the index.
+ * where H(i) is a 2-dimensional point and i >= 1 is the index.
+ * Halton sequence is used to generate sample points for Pi estimation.
*/
private static class HaltonSequence {
/** Bases */
@@ -94,6 +112,8 @@
/** Compute next point.
* Assume the current point is H(index).
* Compute H(index+1).
+ *
+ * @return a 2-dimensional point with coordinates in [0,1)^2
*/
double[] nextPoint() {
index++;
@@ -113,37 +133,33 @@
}
/**
- * Mappper class for Pi estimation.
+ * Mapper class for Pi estimation.
+ * Generate points in a unit square
+ * and then count points inside/outside of the inscribed circle of the
square.
*/
-
public static class PiMapper extends MapReduceBase
- implements Mapper<LongWritable, LongWritable, LongWritable, LongWritable> {
-
- /** Mapper configuration.
- *
- */
- @Override
- public void configure(JobConf job) {
- }
-
- long numInside = 0L;
- long numOutside = 0L;
-
+ implements Mapper<LongWritable, LongWritable, BooleanWritable,
LongWritable> {
+
/** Map method.
- * @param key
- * @param val not-used
- * @param out
+ * @param offset samples starting from the (offset+1)th sample.
+ * @param size the number of samples for this map
+ * @param out output {ture->numInside, false->numOutside}
* @param reporter
*/
- public void map(LongWritable key,
- LongWritable val,
- OutputCollector<LongWritable, LongWritable> out,
+ public void map(LongWritable offset,
+ LongWritable size,
+ OutputCollector<BooleanWritable, LongWritable> out,
Reporter reporter) throws IOException {
- final HaltonSequence haltonsequence = new HaltonSequence(key.get());
- final long nSamples = val.get();
- for(long idx = 0; idx < nSamples; idx++) {
+ final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
+ long numInside = 0L;
+ long numOutside = 0L;
+
+ for(long i = 0; i < size.get(); ) {
+ //generate points in a unit square
final double[] point = haltonsequence.nextPoint();
+
+ //count points inside/outside of the inscribed circle of the square
final double x = point[0] - 0.5;
final double y = point[1] - 0.5;
if (x*x + y*y > 0.25) {
@@ -151,170 +167,187 @@
} else {
numInside++;
}
- if (idx%1000 == 1) {
- reporter.setStatus("Generated "+idx+" samples.");
+
+ //report status
+ i++;
+ if (i % 1000 == 0) {
+ reporter.setStatus("Generated " + i + " samples.");
}
}
- out.collect(new LongWritable(0), new LongWritable(numOutside));
- out.collect(new LongWritable(1), new LongWritable(numInside));
- }
-
- @Override
- public void close() {
- // nothing
+
+ //output map results
+ out.collect(new BooleanWritable(true), new LongWritable(numInside));
+ out.collect(new BooleanWritable(false), new LongWritable(numOutside));
}
}
-
+
+ /**
+ * Reducer class for Pi estimation.
+ * Accumulate points inside/outside results from the mappers.
+ */
public static class PiReducer extends MapReduceBase
- implements Reducer<LongWritable, LongWritable, WritableComparable<?>,
Writable> {
+ implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>,
Writable> {
- long numInside = 0;
- long numOutside = 0;
- JobConf conf;
+ private long numInside = 0;
+ private long numOutside = 0;
+ private JobConf conf; //configuration for accessing the file system
- /** Reducer configuration.
- *
- */
+ /** Store job configuration. */
@Override
public void configure(JobConf job) {
conf = job;
}
- /** Reduce method.
- * @param key
- * @param values
- * @param output
+
+ /**
+ * Accumulate number of points inside/outside results from the mappers.
+ * @param isInside Is the points inside?
+ * @param values An iterator to a list of point counts
+ * @param output dummy, not used here.
* @param reporter
*/
- public void reduce(LongWritable key,
+ public void reduce(BooleanWritable isInside,
Iterator<LongWritable> values,
OutputCollector<WritableComparable<?>, Writable> output,
Reporter reporter) throws IOException {
- if (key.get() == 1) {
- while (values.hasNext()) {
- long num = values.next().get();
- numInside += num;
- }
+ if (isInside.get()) {
+ for(; values.hasNext(); numInside += values.next().get());
} else {
- while (values.hasNext()) {
- long num = values.next().get();
- numOutside += num;
- }
+ for(; values.hasNext(); numOutside += values.next().get());
}
}
-
+
+ /**
+ * Reduce task done, write output to a file.
+ */
@Override
public void close() throws IOException {
- Path tmpDir = new Path("test-mini-mr");
- Path outDir = new Path(tmpDir, "out");
+ //write output to a file
+ Path outDir = new Path(TMP_DIR, "out");
Path outFile = new Path(outDir, "reduce-out");
FileSystem fileSys = FileSystem.get(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- outFile,
LongWritable.class, LongWritable.class,
-
CompressionType.NONE);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, LongWritable.class, LongWritable.class,
+ CompressionType.NONE);
writer.append(new LongWritable(numInside), new LongWritable(numOutside));
writer.close();
}
}
/**
- * This is the main driver for computing the value of Pi using
- * monte-carlo method.
+ * Run a map/reduce job for estimating Pi.
+ *
+ * @return the estimated value of Pi
*/
- BigDecimal launch(int numMaps, long numPoints, String jt, String dfs)
- throws IOException {
+ public static BigDecimal estimate(int numMaps, long numPoints, JobConf
jobConf
+ ) throws IOException {
+ //setup job conf
+ jobConf.setJobName(PiEstimator.class.getSimpleName());
- JobConf jobConf = new JobConf(getConf(), PiEstimator.class);
- if (jt != null) { jobConf.set("mapred.job.tracker", jt); }
- if (dfs != null) { FileSystem.setDefaultUri(jobConf, dfs); }
- jobConf.setJobName("test-mini-mr");
-
- // turn off speculative execution, because DFS doesn't handle
- // multiple writers to the same file.
- jobConf.setSpeculativeExecution(false);
jobConf.setInputFormat(SequenceFileInputFormat.class);
-
- jobConf.setOutputKeyClass(LongWritable.class);
+
+ jobConf.setOutputKeyClass(BooleanWritable.class);
jobConf.setOutputValueClass(LongWritable.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-
+
jobConf.setMapperClass(PiMapper.class);
+ jobConf.setNumMapTasks(numMaps);
+
jobConf.setReducerClass(PiReducer.class);
-
jobConf.setNumReduceTasks(1);
- Path tmpDir = new Path("test-mini-mr");
- Path inDir = new Path(tmpDir, "in");
- Path outDir = new Path(tmpDir, "out");
- FileSystem fileSys = FileSystem.get(jobConf);
- fileSys.delete(tmpDir, true);
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
-
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobConf.setSpeculativeExecution(false);
+
+ //setup input/output directories
+ final Path inDir = new Path(TMP_DIR, "in");
+ final Path outDir = new Path(TMP_DIR, "out");
FileInputFormat.setInputPaths(jobConf, inDir);
FileOutputFormat.setOutputPath(jobConf, outDir);
-
- jobConf.setNumMapTasks(numMaps);
-
- for(int idx=0; idx < numMaps; ++idx) {
- Path file = new Path(inDir, "part"+idx);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf,
- file,
LongWritable.class, LongWritable.class, CompressionType.NONE);
- writer.append(new LongWritable(idx * numPoints), new
LongWritable(numPoints));
- writer.close();
- System.out.println("Wrote input for Map #"+idx);
+
+ final FileSystem fs = FileSystem.get(jobConf);
+ if (fs.exists(TMP_DIR)) {
+ throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
+ + " already exists. Please remove it first.");
}
-
- BigDecimal estimate = BigDecimal.ZERO;
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Cannot create input directory " + inDir);
+ }
+
try {
+ //generate an input file for each map task
+ for(int i=0; i < numMaps; ++i) {
+ final Path file = new Path(inDir, "part"+i);
+ final LongWritable offset = new LongWritable(i * numPoints);
+ final LongWritable size = new LongWritable(numPoints);
+ final SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, jobConf, file,
+ LongWritable.class, LongWritable.class, CompressionType.NONE);
+ try {
+ writer.append(offset, size);
+ } finally {
+ writer.close();
+ }
+ System.out.println("Wrote input for Map #"+i);
+ }
+
+ //start a map/reduce job
System.out.println("Starting Job");
- long startTime = System.currentTimeMillis();
+ final long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
- System.out.println("Job Finished in "+
- (System.currentTimeMillis() - startTime)/1000.0 + "
seconds");
+ final double duration = (System.currentTimeMillis() - startTime)/1000.0;
+ System.out.println("Job Finished in " + duration + " seconds");
+
+ //read outputs
Path inFile = new Path(outDir, "reduce-out");
- SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
- jobConf);
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
- reader.next(numInside, numOutside);
- reader.close();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile,
jobConf);
+ try {
+ reader.next(numInside, numOutside);
+ } finally {
+ reader.close();
+ }
- estimate = BigDecimal.valueOf(4).setScale(20)
+ //compute estimated value
+ return BigDecimal.valueOf(4).setScale(20)
.multiply(BigDecimal.valueOf(numInside.get()))
.divide(BigDecimal.valueOf(numMaps))
.divide(BigDecimal.valueOf(numPoints));
} finally {
- fileSys.delete(tmpDir, true);
+ fs.delete(TMP_DIR, true);
}
-
- return estimate;
}
-
+
/**
- * Launches all the tasks in order.
+ * Parse arguments and then runs a map/reduce job.
+ * Print output in standard out.
+ *
+ * @return a non-zero if there is an error. Otherwise, return 0.
*/
public int run(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+ if (args.length != 2) {
+ System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
- int nMaps = Integer.parseInt(args[0]);
- long nSamples = Long.parseLong(args[1]);
+ final int nMaps = Integer.parseInt(args[0]);
+ final long nSamples = Long.parseLong(args[1]);
- System.out.println("Number of Maps = "+nMaps+" Samples per Map =
"+nSamples);
+ System.out.println("Number of Maps = " + nMaps);
+ System.out.println("Samples per Map = " + nSamples);
- System.out.println("Estimated value of PI is "+
- launch(nMaps, nSamples, null, null));
-
+ final JobConf jobConf = new JobConf(getConf(), getClass());
+ System.out.println("Estimated value of Pi is "
+ + estimate(nMaps, nSamples, jobConf));
return 0;
}
-
+
+ /**
+ * main method for running it as a stand alone command.
+ */
public static void main(String[] argv) throws Exception {
- int res = ToolRunner.run(new Configuration(), new PiEstimator(), argv);
- System.exit(res);
+ System.exit(ToolRunner.run(null, new PiEstimator(), argv));
}
-
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
Wed Nov 12 17:41:39 2008
@@ -41,9 +41,6 @@
* A JUnit test to test min map-reduce cluster with local file system.
*/
public class TestMiniMRLocalFS extends TestCase {
-
- static final int NUM_MAPS = 10;
- static final int NUM_SAMPLES = 100000;
private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data","/tmp"))
.toURI().toString().replace(' ', '+');
@@ -52,10 +49,8 @@
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(2, "file:///", 3);
- double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES,
- mr.createJobConf());
- double error = Math.abs(Math.PI - estimate);
- assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error <
0.01));
+ TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
+
// run the wordcount example with caching
JobConf job = mr.createJobConf();
TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
Wed Nov 12 17:41:39 2008
@@ -171,7 +171,8 @@
static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
LOG.info("runPI");
- double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+ double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
+ NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error <
0.01));
checkTaskDirectories(mr, new String[]{}, new String[]{});
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java?rev=713612&r1=713611&r2=713612&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTaskLimits.java Wed
Nov 12 17:41:39 2008
@@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.examples.PiEstimator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -56,7 +57,7 @@
static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
LOG.info("runPI");
- double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+ double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES,
jobconf).doubleValue();
double error = Math.abs(Math.PI - estimate);
System.out.println("PI estimation " + error);
}