Author: cutting Date: Mon Jun 19 14:48:07 2006 New Revision: 415438 URL: http://svn.apache.org/viewvc?rev=415438&view=rev Log: HADOOP-123. Add MapReduce unit tests that run a jobtracker and tasktracker, greatly increasing code coverage. Contributed by Milind.
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=415438&r1=415437&r2=415438&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 19 14:48:07 2006 @@ -12,6 +12,10 @@ 3. HADOOP-250. Add an HTTP user interface to the namenode, running on port 50070. (Devaraj Das via cutting) + 4. HADOOP-123. Add MapReduce unit tests that run a jobtracker and + tasktracker, greatly increasing code coverage. + (Milind Bhandarkar via cutting) + Release 0.3.2 - 2006-06-09 Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=415438&r1=415437&r2=415438&view=diff ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Mon Jun 19 14:48:07 2006 @@ -28,12 +28,14 @@ <property name="build.examples" value="${build.dir}/examples"/> <property name="build.libhdfs" value="${build.dir}/libhdfs"/> <property name="build.docs" value="${build.dir}/docs"/> + <property name="build.minimr" value="${build.dir}/minimr"/> <property name="build.javadoc" value="${build.docs}/api"/> <property name="build.encoding" value="ISO-8859-1"/> <property name="test.src.dir" value="${basedir}/src/test"/> <property name="test.build.dir" value="${build.dir}/test"/> <property name="test.build.data" value="${test.build.dir}/data"/> + <property name="hadoop.log.dir" value="${test.build.dir}/logs"/> <property name="test.build.classes" value="${test.build.dir}/classes"/> <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/> <property name="test.include" value="Test*"/> @@ -65,6 +67,7 @@ <!-- the unit test classpath: uses test.src.dir for configuration --> <path id="test.classpath"> <pathelement location="${test.build.classes}" /> + <pathelement location="${build.minimr}" /> <pathelement location="${test.src.dir}"/> <pathelement location="${build.dir}"/> <path refid="classpath"/> @@ -89,6 +92,7 @@ <mkdir dir="${build.webapps}/job/WEB-INF"/> <mkdir dir="${build.webapps}/dfs/WEB-INF"/> <mkdir dir="${build.examples}"/> + <mkdir dir="${build.minimr}"/> <mkdir dir="${test.build.dir}"/> <mkdir dir="${test.build.classes}"/> @@ -258,10 +262,13 @@ <delete dir="${test.build.data}"/> <mkdir dir="${test.build.data}"/> + <delete dir="${hadoop.log.dir}"/> + <mkdir dir="${hadoop.log.dir}"/> <junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}" errorProperty="tests.failed" failureProperty="tests.failed"> <sysproperty key="test.build.data" value="${test.build.data}"/> + <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> <sysproperty key="test.src.dir" value="${test.src.dir}"/> <sysproperty key="hadoop.log.dir" value="."/> <classpath refid="${test.classpath.id}"/> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=415438&r1=415437&r2=415438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jun 19 14:48:07 2006 @@ -58,10 +58,12 @@ public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker"); private static JobTracker tracker = null; + private static boolean runTracker = true; public static void startTracker(Configuration conf) throws IOException { if (tracker != null) throw new IOException("JobTracker already running."); - while (true) { + runTracker = true; + while (runTracker) { try { tracker = new JobTracker(conf); break; @@ -73,13 +75,21 @@ } catch (InterruptedException e) { } } - tracker.offerService(); + if (runTracker) { tracker.offerService(); } } public static JobTracker getTracker() { return tracker; } + public static void stopTracker() throws IOException { + if (tracker == null) + throw new IOException("Trying to stop JobTracker that is not running."); + runTracker = false; + tracker.close(); + tracker = null; + } + /** * A thread to timeout tasks that have been assigned to task trackers, * but that haven't reported back yet. @@ -353,8 +363,11 @@ private TreeMap taskTrackers = new TreeMap(); Vector jobInitQueue = new Vector(); ExpireTrackers expireTrackers = new ExpireTrackers(); + Thread expireTrackersThread = null; RetireJobs retireJobs = new RetireJobs(); + Thread retireJobsThread = null; JobInitThread initJobs = new JobInitThread(); + Thread initJobsThread = null; ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks); @@ -439,9 +452,12 @@ this.startTime = System.currentTimeMillis(); - new Thread(this.expireTrackers).start(); - new Thread(this.retireJobs).start(); - new Thread(this.initJobs).start(); + this.expireTrackersThread = new Thread(this.expireTrackers); + this.expireTrackersThread.start(); + this.retireJobsThread = new Thread(this.retireJobs); + this.retireJobsThread.start(); + this.initJobsThread = new Thread(this.initJobs); + this.initJobsThread.start(); expireLaunchingTaskThread.start(); } @@ -466,8 +482,66 @@ this.interTrackerServer.join(); } catch (InterruptedException ie) { } + LOG.info("Stopped interTrackerServer"); } + void close() throws IOException { + if (this.infoServer != null) { + LOG.info("Stopping infoServer"); + try { + this.infoServer.stop(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + if (this.interTrackerServer != null) { + LOG.info("Stopping interTrackerServer"); + this.interTrackerServer.stop(); + } + if (this.expireTrackers != null) { + LOG.info("Stopping expireTrackers"); + this.expireTrackers.stopTracker(); + try { + this.expireTrackersThread.interrupt(); + this.expireTrackersThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + if (this.retireJobs != null) { + LOG.info("Stopping retirer"); + this.retireJobs.stopRetirer(); + try { + this.retireJobsThread.interrupt(); + this.retireJobsThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + if (this.initJobs != null) { + LOG.info("Stopping initer"); + this.initJobs.stopIniter(); + try { + this.initJobsThread.interrupt(); + this.initJobsThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + if (this.expireLaunchingTaskThread != null) { + LOG.info("Stopping expireLaunchingTasks"); + this.expireLaunchingTasks.stop(); + try { + this.expireLaunchingTaskThread.interrupt(); + this.expireLaunchingTaskThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + LOG.info("stopped all jobtracker services"); + return; + } + /////////////////////////////////////////////////////// // Maintain lookup tables; called by JobInProgress // and TaskInProgress Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=415438&r1=415437&r2=415438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jun 19 14:48:07 2006 @@ -60,6 +60,10 @@ Server mapOutputServer = null; InterTrackerProtocol jobClient; + StatusHttpServer server = null; + + boolean shuttingDown = false; + TreeMap tasks = null; TreeMap runningTasks = null; int mapTotal = 0; @@ -145,8 +149,22 @@ this.justStarted = true; this.jobClient = (InterTrackerProtocol) RPC.getProxy(InterTrackerProtocol.class, jobTrackAddr, this.fConf); + + this.running = true; } + public synchronized void shutdown() throws IOException { + shuttingDown = true; + close(); + if (this.server != null) { + try { + LOG.info("Shttting down StatusHttpServer"); + this.server.stop(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + } /** * Close down the TaskTracker and all its components. We must also shutdown * any running tasks or threads, and cleanup disk space. A new TaskTracker @@ -191,6 +209,8 @@ mapOutputServer = null; } + this.running = false; + // Clear local storage this.mapOutputFile.cleanupStorage(); } @@ -206,7 +226,7 @@ this.mapOutputFile = new MapOutputFile(); this.mapOutputFile.setConf(conf); int httpPort = conf.getInt("tasktracker.http.port", 50060); - StatusHttpServer server = new StatusHttpServer("task", httpPort, true); + this.server = new StatusHttpServer("task", httpPort, true); int workerThreads = conf.getInt("tasktracker.http.threads", 40); server.setThreads(1, workerThreads); server.start(); @@ -236,7 +256,7 @@ long lastHeartbeat = 0; this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf); - while (running) { + while (running && !shuttingDown) { long now = System.currentTimeMillis(); long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat); @@ -407,26 +427,30 @@ */ public void run() { try { - while (running) { + while (running && !shuttingDown) { boolean staleState = false; try { // This while-loop attempts reconnects if we get network errors - while (running && ! staleState) { + while (running && ! staleState && !shuttingDown ) { try { if (offerService() == STALE_STATE) { staleState = true; } } catch (Exception ex) { - LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...", ex); - try { - Thread.sleep(5000); - } catch (InterruptedException ie) { + if (!shuttingDown) { + LOG.info("Lost connection to JobTracker [" + + jobTrackAddr + "]. Retrying...", ex); + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + } } } } } finally { close(); } + if (shuttingDown) { return; } LOG.info("Reinitializing local state"); initialize(); } @@ -529,18 +553,20 @@ localJobConf = new JobConf(localJobFile); localJobConf.set("mapred.task.id", task.getTaskId()); + localJobConf.set("mapred.local.dir", + this.defaultJobConf.get("mapred.local.dir")); String jarFile = localJobConf.getJar(); if (jarFile != null) { fs.copyToLocalFile(new Path(jarFile), localJarFile); localJobConf.setJar(localJarFile.toString()); + } - FileSystem localFs = FileSystem.getNamed("local", fConf); - OutputStream out = localFs.create(localJobFile); - try { - localJobConf.write(out); - } finally { - out.close(); - } + FileSystem localFs = FileSystem.getNamed("local", fConf); + OutputStream out = localFs.create(localJobFile); + try { + localJobConf.write(out); + } finally { + out.close(); } // set the task's configuration to the local job conf // rather than the default. @@ -836,7 +862,7 @@ Task task = umbilical.getTask(taskid); JobConf job = new JobConf(task.getJobFile()); - + defaultConf.addFinalResource(new Path(task.getJobFile())); startPinging(umbilical, taskid); // start pinging parent Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=415438&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Mon Jun 19 14:48:07 2006 @@ -0,0 +1,210 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.*; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; + +/** + * This class creates a single-process Map-Reduce cluster for junit testing. + * One thread is created for each server. + * @author Milind Bhandarkar + */ +public class MiniMRCluster { + + private Thread jobTrackerThread; + private JobTrackerRunner jobTracker; + private TaskTrackerRunner taskTracker; + + private int jobTrackerPort = 0; + private int taskTrackerPort = 0; + + private int numTaskTrackers; + + private ArrayList taskTrackerList = new ArrayList(); + private ArrayList taskTrackerThreadList = new ArrayList(); + + private String namenode; + + /** + * An inner class that runs a job tracker. + */ + class JobTrackerRunner implements Runnable { + /** + * Create the job tracker and run it. + */ + public void run() { + try { + JobConf jc = new JobConf(); + jc.set("fs.name.node", namenode); + jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort); + // this timeout seems to control the minimum time for the test, so + // set it down at 2 seconds. + jc.setInt("ipc.client.timeout", 1000); + jc.set("mapred.local.dir","build/test/mapred/local"); + JobTracker.startTracker(jc); + } catch (Throwable e) { + System.err.println("Job tracker crashed:"); + e.printStackTrace(); + } + } + + /** + * Shutdown the job tracker and wait for it to finish. + */ + public void shutdown() { + try { + JobTracker.stopTracker(); + } catch (Throwable e) { + System.err.println("Unable to shut down job tracker:"); + e.printStackTrace(); + } + } + } + + /** + * An inner class to run the task tracker. + */ + class TaskTrackerRunner implements Runnable { + TaskTracker tt; + + /** + * Create and run the task tracker. + */ + public void run() { + try { + JobConf jc = new JobConf(); + jc.set("fs.name.node", namenode); + jc.set("mapred.job.tracker", "localhost:"+jobTrackerPort); + // this timeout seems to control the minimum time for the test, so + // set it down at 2 seconds. + jc.setInt("ipc.client.timeout", 1000); + jc.setInt("mapred.task.tracker.info.port", taskTrackerPort++); + jc.setInt("mapred.task.tracker.output.port", taskTrackerPort++); + jc.setInt("mapred.task.tracker.report.port", taskTrackerPort++); + File localDir = new File(jc.get("mapred.local.dir")); + File ttDir = new File(localDir, Integer.toString(taskTrackerPort)); + ttDir.mkdirs(); + jc.set("mapred.local.dir", ttDir.getAbsolutePath()); + tt = new TaskTracker(jc); + tt.run(); + } catch (Throwable e) { + tt = null; + System.err.println("Task tracker crashed:"); + e.printStackTrace(); + } + } + + /** + * Shut down the server and wait for it to finish. + */ + public void shutdown() { + if (tt != null) { + try { + tt.shutdown(); + } catch (Throwable e) { + System.err.println("Unable to shut down task tracker:"); + e.printStackTrace(); + } + } + } + } + + /** + * Create the config and start up the servers. + */ + public MiniMRCluster(int jobTrackerPort, + int taskTrackerPort, + int numTaskTrackers, + String namenode) throws IOException { + this.jobTrackerPort = jobTrackerPort; + this.taskTrackerPort = taskTrackerPort; + this.numTaskTrackers = numTaskTrackers; + this.namenode = namenode; + + File configDir = new File("build", "minimr"); + configDir.mkdirs(); + File siteFile = new File(configDir, "hadoop-site.xml"); + PrintWriter pw = new PrintWriter(siteFile); + pw.print("<?xml version=\"1.0\"?>\n"+ + "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n"+ + "<configuration>\n"+ + " <property>\n"+ + " <name>mapred.system.dir</name>\n"+ + " <value>build/test/mapred/system</value>\n"+ + " </property>\n"+ + "</configuration>\n"); + pw.close(); + jobTracker = new JobTrackerRunner(); + jobTrackerThread = new Thread(jobTracker); + jobTrackerThread.start(); + try { // let jobTracker get started + Thread.sleep(2000); + } catch(InterruptedException e) { + } + for (int idx = 0; idx < numTaskTrackers; idx++) { + TaskTrackerRunner taskTracker = new TaskTrackerRunner(); + Thread taskTrackerThread = new Thread(taskTracker); + taskTrackerThread.start(); + taskTrackerList.add(taskTracker); + taskTrackerThreadList.add(taskTrackerThread); + } + try { // let taskTrackers get started + Thread.sleep(2000); + } catch(InterruptedException e) { + } + } + + /** + * Shut down the servers. + */ + public void shutdown() { + try { + for (int idx = 0; idx < numTaskTrackers; idx++) { + TaskTrackerRunner taskTracker = (TaskTrackerRunner) taskTrackerList.get(idx); + Thread taskTrackerThread = (Thread) taskTrackerThreadList.get(idx); + taskTracker.shutdown(); + taskTrackerThread.interrupt(); + try { + taskTrackerThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } + jobTracker.shutdown(); + jobTrackerThread.interrupt(); + try { + jobTrackerThread.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + } finally { + File configDir = new File("build", "minimr"); + File siteFile = new File(configDir, "hadoop-site.xml"); + siteFile.delete(); + } + } + + public static void main(String[] args) throws IOException { + System.out.println("Bringing up Jobtracker and tasktrackers."); + MiniMRCluster mr = new MiniMRCluster(50000, 50002, 4, "local"); + System.out.println("JobTracker and TaskTrackers are up."); + mr.shutdown(); + System.out.println("JobTracker and TaskTrackers brought down."); + } +} + Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java?rev=415438&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/PiEstimator.java Mon Jun 19 14:48:07 2006 @@ -0,0 +1,212 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * A Map-reduce program to estimaate the valu eof Pi using monte-carlo + * method. + * + * @author Milind Bhandarkar + */ +public class PiEstimator { + + /** + * Mappper class for Pi estimation. + */ + + public static class PiMapper extends MapReduceBase implements Mapper { + + /** Mapper configuration. + * + */ + public void configure(JobConf job) { + } + + static Random r = new Random(); + + /** Map method. + * @param key + * @param value not-used. + * @param out + * @param reporter + */ + public void map(WritableComparable key, + Writable val, + OutputCollector out, + Reporter reporter) throws IOException { + int nSamples = ((IntWritable) key).get(); + for(int idx = 0; idx < nSamples; idx++) { + double x = r.nextDouble(); + double y = r.nextDouble(); + double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5); + if (d > 0.25) { + out.collect(new IntWritable(0), new IntWritable(1)); + } else { + out.collect(new IntWritable(1), new IntWritable(1)); + } + if (idx%100 == 1) { + reporter.setStatus("Generated "+idx+" samples."); + } + } + } + + public void close() { + // nothing + } + } + + public static class PiReducer extends MapReduceBase implements Reducer { + int numInside = 0; + int numOutside = 0; + JobConf conf; + + /** Reducer configuration. + * + */ + public void configure(JobConf job) { + conf = job; + } + /** Reduce method. + * @ param key + * @param values + * @param output + * @param reporter + */ + public void reduce(WritableComparable key, + Iterator values, + OutputCollector output, + Reporter reporter) throws IOException { + if (((IntWritable)key).get() == 1) { + while (values.hasNext()) { + int num = ((IntWritable)values.next()).get(); + numInside += num; + } + } else { + while (values.hasNext()) { + int num = ((IntWritable)values.next()).get(); + numOutside += num; + } + } + } + + public void close() throws IOException { + Path tmpDir = new Path("test-mini-mr"); + Path outDir = new Path(tmpDir, "out"); + Path outFile = new Path(outDir, "reduce-out"); + FileSystem fileSys = FileSystem.get(conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, outFile, + IntWritable.class, IntWritable.class); + writer.append(new IntWritable(numInside), new IntWritable(numOutside)); + writer.close(); + } + } + + /** + * This is the main driver for computing the value of Pi using + * monte-carlo method. + */ + static double launch(int numMaps, int numPoints, String jt, String dfs) + throws IOException { + + Configuration conf = new Configuration(); + JobConf jobConf = new JobConf(conf, PiEstimator.class); + if (jt != null) { jobConf.set("mapred.job.tracker", jt); } + if (dfs != null) { jobConf.set("fs.default.name", dfs); } + jobConf.setJobName("test-mini-mr"); + + // turn off speculative execution, because DFS doesn't handle + // multiple writers to the same file. + jobConf.setSpeculativeExecution(false); + jobConf.setInputKeyClass(IntWritable.class); + jobConf.setInputValueClass(IntWritable.class); + jobConf.setInputFormat(SequenceFileInputFormat.class); + + jobConf.setOutputKeyClass(IntWritable.class); + jobConf.setOutputValueClass(IntWritable.class); + jobConf.setOutputFormat(SequenceFileOutputFormat.class); + + jobConf.setMapperClass(PiMapper.class); + jobConf.setReducerClass(PiReducer.class); + + jobConf.setNumReduceTasks(1); + + Path tmpDir = new Path("test-mini-mr"); + Path inDir = new Path(tmpDir, "in"); + Path outDir = new Path(tmpDir, "out"); + FileSystem fileSys = FileSystem.get(jobConf); + fileSys.delete(tmpDir); + fileSys.mkdirs(inDir); + + jobConf.setInputPath(inDir); + jobConf.setOutputPath(outDir); + + jobConf.setNumMapTasks(numMaps); + + for(int idx=0; idx < numMaps; ++idx) { + Path file = new Path(inDir, "part"+idx); + SequenceFile.Writer writer = new SequenceFile.Writer(fileSys, file, + IntWritable.class, IntWritable.class); + writer.append(new IntWritable(numPoints), new IntWritable(0)); + writer.close(); + } + + double estimate = 0.0; + + try { + JobClient.runJob(jobConf); + Path inFile = new Path(outDir, "reduce-out"); + SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile, + jobConf); + IntWritable numInside = new IntWritable(); + IntWritable numOutside = new IntWritable(); + reader.next(numInside, numOutside); + reader.close(); + estimate = (double) (numInside.get()*4.0)/(numMaps*numPoints); + } finally { + fileSys.delete(tmpDir); + } + + return estimate; + } + + /** + * Launches all the tasks in order. + */ + public static void main(String[] argv) throws Exception { + if (argv.length < 2) { + System.err.println("Usage: TestMiniMR <nMaps> <nSamples>"); + return; + } + + int nMaps = Integer.parseInt(argv[0]); + int nSamples = Integer.parseInt(argv[1]); + + System.out.println("Estimated value of PI is "+ + launch(nMaps, nSamples, null, null)); + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java?rev=415438&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRBringup.java Mon Jun 19 14:48:07 2006 @@ -0,0 +1,38 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import junit.framework.TestCase; + +/** + * A Unit-test to test bringup and shutdown of Mini Map-Reduce Cluster. + * + * @author Milind Bhandarkar + */ +public class TestMiniMRBringup extends TestCase { + + public void testBringUp() throws IOException { + MiniMRCluster mr = null; + try { + mr = new MiniMRCluster(50000, 50010, 1, "local"); + } finally { + if (mr != null) { mr.shutdown(); } + } + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=415438&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Mon Jun 19 14:48:07 2006 @@ -0,0 +1,43 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import junit.framework.TestCase; + +/** + * A JUnit test to test min map-reduce cluster with local file system. + * + * @author Milind Bhandarkar + */ +public class TestMiniMRLocalFS extends TestCase { + + static final int NUM_MAPS = 10; + static final int NUM_SAMPLES = 100000; + + public void testWithLocal() throws IOException { + MiniMRCluster mr = null; + try { + mr = new MiniMRCluster(60030, 60040, 2, "local"); + double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local"); + double error = Math.abs(Math.PI - estimate); + assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); + } finally { + if (mr != null) { mr.shutdown(); } + } + } +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=415438&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Jun 19 14:48:07 2006 @@ -0,0 +1,57 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; + +/** + * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS. + * + * @author Milind Bhandarkar + */ +public class TestMiniMRWithDFS extends TestCase { + + static final int NUM_MAPS = 10; + static final int NUM_SAMPLES = 100000; + + public void testWithDFS() throws IOException { + String namenode = null; + MiniDFSCluster dfs = null; + MiniMRCluster mr = null; + FileSystem fileSys = null; + try { + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(65314, conf); + fileSys = dfs.getFileSystem(); + namenode = fileSys.getName(); + mr = new MiniMRCluster(50050, 50060, 4, namenode); + double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:50050", namenode); + double error = Math.abs(Math.PI - estimate); + assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); + } finally { + if (fileSys != null) { fileSys.close(); } + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown(); + } + } + } + +} Modified: lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp?rev=415438&r1=415437&r2=415438&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp (original) +++ lucene/hadoop/trunk/src/webapps/task/getMapOutput.jsp Mon Jun 19 14:48:07 2006 @@ -6,10 +6,10 @@ import="javax.servlet.http.*" import="java.io.*" import="java.util.*" - import="java.util.logging.Logger" import="org.apache.hadoop.fs.*" import="org.apache.hadoop.mapred.*" import="org.apache.hadoop.util.*" + import="org.apache.commons.logging.*" %><% String mapId = request.getParameter("map"); String reduceId = request.getParameter("reduce"); @@ -35,8 +35,8 @@ } catch (IOException ie) { TaskTracker tracker = (TaskTracker) application.getAttribute("task.tracker"); - Logger log = (Logger) application.getAttribute("log"); - log.warning("Http server (getMapOutput.jsp): " + + Log log = (Log) application.getAttribute("log"); + log.warn("Http server (getMapOutput.jsp): " + StringUtils.stringifyException(ie)); tracker.mapOutputLost(mapId); throw ie;