Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Tue Jun 16 20:54:24 2009 @@ -23,17 +23,22 @@ import java.io.IOException; import java.util.Random; import java.util.Iterator; +import java.util.StringTokenizer; import junit.framework.TestCase; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ExitCodeException; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.TestProcfsBasedProcessTree; import org.apache.commons.logging.Log; @@ -49,11 +54,11 @@ .getLog(TestKillSubProcesses.class); private static String TEST_ROOT_DIR = new File(System.getProperty( - "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); + "test.build.data", "/tmp"), "killjob").toURI().toString().replace(' ', '+'); private static JobClient jobClient = null; - private static MiniMRCluster mr = null; + static MiniMRCluster mr = null; private static Path scriptDir = null; private static String scriptDirName = null; private static String pid = null; @@ -70,7 +75,7 @@ conf.setJobName("testkilljobsubprocesses"); conf.setMapperClass(KillingMapperWithChildren.class); - scriptDir = new Path(TEST_ROOT_DIR + "/script"); + scriptDir = new Path(TEST_ROOT_DIR , "script"); RunningJob job = runJobAndSetProcessHandle(jt, conf); // kill the job now @@ -181,9 +186,8 @@ } } LOG.info("pid of map task is " + pid); - - // Checking if the map task is alive - assertTrue(ProcessTree.isAlive(pid)); + //Checking if the map task is alive + assertTrue("Map is no more alive", isAlive(pid)); LOG.info("The map task is alive before Job completion, as expected."); } } @@ -216,7 +220,7 @@ " is " + childPid); assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is not alive before Job completion", - ProcessTree.isAlive(childPid)); + isAlive(childPid)); } } return job; @@ -250,10 +254,10 @@ " is " + childPid); assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion", - !ProcessTree.isAlive(childPid)); + !isAlive(childPid)); } } - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.getLocal(mr.createJobConf()); if(fs.exists(scriptDir)) { fs.delete(scriptDir, true); } @@ -261,10 +265,23 @@ private static RunningJob runJob(JobConf conf) throws IOException { - final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input"); - final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output"); + final Path inDir; + final Path outDir; + FileSystem fs = FileSystem.getLocal(conf); + FileSystem tempFs = FileSystem.get(conf); + //Check if test is run with hdfs or local file system. + //if local filesystem then prepend TEST_ROOT_DIR, otherwise + //killjob folder would be created in workspace root. + if (!tempFs.getUri().toASCIIString().equals( + fs.getUri().toASCIIString())) { + inDir = new Path("killjob/input"); + outDir = new Path("killjob/output"); + } else { + inDir = new Path(TEST_ROOT_DIR, "input"); + outDir = new Path(TEST_ROOT_DIR, "output"); + } - FileSystem fs = FileSystem.get(conf); + if(fs.exists(scriptDir)) { fs.delete(scriptDir, true); } @@ -290,9 +307,7 @@ // run the TCs conf = mr.createJobConf(); JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); - runKillingJobAndValidate(jt, conf); - runFailingJobAndValidate(jt, conf); - runSuccessfulJobAndValidate(jt, conf); + runTests(conf, jt); } finally { if (mr != null) { mr.shutdown(); @@ -300,12 +315,25 @@ } } + void runTests(JobConf conf, JobTracker jt) throws IOException { + FileSystem fs = FileSystem.getLocal(mr.createJobConf()); + Path rootDir = new Path(TEST_ROOT_DIR); + if(!fs.exists(rootDir)) { + fs.mkdirs(rootDir); + } + fs.setPermission(rootDir, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + runKillingJobAndValidate(jt, conf); + runFailingJobAndValidate(jt, conf); + runSuccessfulJobAndValidate(jt, conf); + } + /** * Creates signal file */ private static void signalTask(String signalFile, JobConf conf) { try { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.getLocal(conf); fs.createNewFile(new Path(signalFile)); } catch(IOException e) { LOG.warn("Unable to create signal file. " + e); @@ -317,10 +345,12 @@ */ private static void runChildren(JobConf conf) throws IOException { if (ProcessTree.isSetsidAvailable) { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.getLocal(conf); TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath(); - scriptDir = new Path(TEST_ROOT_DIR + "/script"); - + scriptDir = new Path(TEST_ROOT_DIR + "/script"); + if(fs.exists(scriptDir)){ + fs.delete(scriptDir, true); + } // create shell script Random rm = new Random(); Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() @@ -329,6 +359,7 @@ String script = "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" + "echo hello\n" + + "trap 'echo got SIGTERM' 15 \n" + "if [ $1 != 0 ]\nthen\n" + " sh " + shellScript + " $(($1-1))\n" + "else\n" + @@ -447,4 +478,46 @@ throw new RuntimeException("failing map"); } } + + /** + * Check for presence of the process with the pid passed is alive or not + * currently. + * + * @param pid pid of the process + * @return if a process is alive or not. + */ + private static boolean isAlive(String pid) throws IOException { + String commandString ="ps -o pid,command -e"; + String args[] = new String[] {"bash", "-c" , commandString}; + ShellCommandExecutor shExec = new ShellCommandExecutor(args); + try { + shExec.execute(); + }catch(ExitCodeException e) { + return false; + } catch (IOException e) { + LOG.warn("IOExecption thrown while checking if process is alive" + + StringUtils.stringifyException(e)); + throw e; + } + + String output = shExec.getOutput(); + + //Parse the command output and check for pid, ignore the commands + //which has ps or grep in it. + StringTokenizer strTok = new StringTokenizer(output, "\n"); + boolean found = false; + while(strTok.hasMoreTokens()) { + StringTokenizer pidToken = new StringTokenizer(strTok.nextToken(), + " "); + String pidStr = pidToken.nextToken(); + String commandStr = pidToken.nextToken(); + if(pid.equals(pidStr) && !(commandStr.contains("ps") + || commandStr.contains("grep"))) { + found = true; + break; + } + } + return found; + } + }
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Tue Jun 16 20:54:24 2009 @@ -30,7 +30,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; @@ -97,15 +96,17 @@ // Run Sort Sort sort = new Sort(); assertEquals(ToolRunner.run(job, sort, sortArgs), 0); - Counters counters = sort.getResult().getCounters(); - long mapInput = counters.findCounter(TaskCounter.MAP_INPUT_BYTES - ).getValue(); + org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters(); + long mapInput = counters.findCounter( + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ). + getValue(); long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, "HDFS_BYTES_READ").getValue(); // the hdfs read should be between 100% and 110% of the map input bytes assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead, (hdfsRead < (mapInput * 1.1)) && - (hdfsRead > mapInput)); + (hdfsRead >= mapInput)); } private static void runSortValidator(JobConf job, Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Tue Jun 16 20:54:24 2009 @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; @@ -375,4 +376,83 @@ stopCluster(); } -} \ No newline at end of file + + /** + * Check if excluded hosts are decommissioned across restart + */ + public void testMRExcludeHostsAcrossRestarts() throws IOException { + // start a cluster with 2 hosts and empty exclude-hosts file + Configuration conf = new Configuration(); + conf.setBoolean("mapred.jobtracker.restart.recover", true); + + File file = new File("hosts.exclude"); + file.delete(); + startCluster(2, 1, 0, conf); + String hostToDecommission = getHostname(1); + conf = mr.createJobConf(new JobConf(conf)); + + // submit a job + Path inDir = new Path("input"); + Path outDir = new Path("output"); + Path signalFilename = new Path("share"); + JobConf newConf = new JobConf(conf); + UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1, + "restart-decommission", signalFilename.toString(), + signalFilename.toString()); + + JobClient jobClient = new JobClient(newConf); + RunningJob job = jobClient.submitJob(newConf); + JobID id = job.getID(); + + // wait for 50% + while (job.mapProgress() < 0.5f) { + UtilsForTests.waitFor(100); + } + + // change the exclude-hosts file to include one host + FileOutputStream out = new FileOutputStream(file); + LOG.info("Writing excluded nodes to log file " + file.toString()); + BufferedWriter writer = null; + try { + writer = new BufferedWriter(new OutputStreamWriter(out)); + writer.write( hostToDecommission + "\n"); // decommission first host + } finally { + if (writer != null) { + writer.close(); + } + out.close(); + } + file.deleteOnExit(); + + // restart the jobtracker + mr.stopJobTracker(); + mr.startJobTracker(); + + // Wait for the JT to be ready + UtilsForTests.waitForJobTracker(jobClient); + + jt = mr.getJobTrackerRunner().getJobTracker(); + UtilsForTests.signalTasks(dfs, dfs.getFileSystem(), + signalFilename.toString(), signalFilename.toString(), 1); + + assertTrue("Decommissioning of tracker has no effect restarted job", + jt.getJob(job.getID()).failedMapTasks > 0); + + // check the cluster status and tracker size + assertEquals("Tracker is not lost upon host decommissioning", + 1, jt.getClusterStatus(false).getTaskTrackers()); + assertEquals("Excluded node count is incorrect", + 1, jt.getClusterStatus(false).getNumExcludedNodes()); + + // check if the host is disallowed + for (TaskTrackerStatus status : jt.taskTrackers()) { + assertFalse("Tracker from decommissioned host still exist", + status.getHost().equals(hostToDecommission)); + } + + // wait for the job + job.waitForCompletion(); + + stopCluster(); + } +} Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Tue Jun 16 20:54:24 2009 @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobTracker.RecoveryManager; import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner; import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler; @@ -310,7 +311,7 @@ fs.delete(rFile,false); // start the jobtracker - LOG.info("Stopping jobtracker with system files deleted"); + LOG.info("Starting jobtracker with system files deleted"); mr.startJobTracker(); UtilsForTests.waitForJobTracker(jc); @@ -394,8 +395,58 @@ LOG.info("Starting jobtracker with fs errors"); mr.startJobTracker(); JobTrackerRunner runner = mr.getJobTrackerRunner(); - assertFalse("Restart count for new job is incorrect", runner.isActive()); + assertFalse("JobTracker is still alive", runner.isActive()); mr.shutdown(); } + + /** + * Test if the jobtracker waits for the info file to be created before + * starting. + */ + public void testJobTrackerInfoCreation() throws Exception { + LOG.info("Testing jobtracker.info file"); + MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null); + String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + + (dfs.getFileSystem()).getUri().getPort(); + // shut down the data nodes + dfs.shutdownDataNodes(); + + // start the jobtracker + JobConf conf = new JobConf(); + FileSystem.setDefaultUri(conf, namenode); + conf.set("mapred.job.tracker", "localhost:0"); + conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); + + JobTracker jobtracker = new JobTracker(conf); + + // now check if the update restart count works fine or not + boolean failed = false; + try { + jobtracker.recoveryManager.updateRestartCount(); + } catch (IOException ioe) { + failed = true; + } + assertTrue("JobTracker created info files without datanodes!!!", failed); + + Path restartFile = jobtracker.recoveryManager.getRestartCountFile(); + Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile(); + FileSystem fs = dfs.getFileSystem(); + assertFalse("Info file exists after update failure", + fs.exists(restartFile)); + assertFalse("Temporary restart-file exists after update failure", + fs.exists(restartFile)); + + // start 1 data node + dfs.startDataNodes(conf, 1, true, null, null, null, null); + dfs.waitActive(); + + failed = false; + try { + jobtracker.recoveryManager.updateRestartCount(); + } catch (IOException ioe) { + failed = true; + } + assertFalse("JobTracker failed to create info files with datanodes!!!", failed); + } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Tue Jun 16 20:54:24 2009 @@ -91,7 +91,7 @@ RawKeyValueIterator rawItr = Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, false, conf.getInt("io.sort.factor", 100), tmpDir, - new Text.Comparator(), new NullProgress(),null,null); + new Text.Comparator(), new NullProgress(), null, null, null); @SuppressWarnings("unchecked") // WritableComparators are not generic ReduceTask.ValuesIterator valItr = new ReduceTask.ValuesIterator<Text,Text>(rawItr, Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTTMemoryReporting.java Tue Jun 16 20:54:24 2009 @@ -22,10 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin; import org.apache.hadoop.util.MemoryCalculatorPlugin; import org.apache.hadoop.util.ToolRunner; @@ -46,7 +43,6 @@ static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class); - private MiniDFSCluster miniDFSCluster; private MiniMRCluster miniMRCluster; /** @@ -77,41 +73,42 @@ getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); long totalPhysicalMemoryOnTT = getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); - long virtualMemoryReservedOnTT = - getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); - long physicalMemoryReservedOnTT = - getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT); + long mapSlotMemorySize = + getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); + long reduceSlotMemorySize = + getConf() + .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT); long reportedTotalVirtualMemoryOnTT = status.getResourceStatus().getTotalVirtualMemory(); long reportedTotalPhysicalMemoryOnTT = status.getResourceStatus().getTotalPhysicalMemory(); - long reportedVirtualMemoryReservedOnTT = - status.getResourceStatus().getReservedTotalMemory(); - long reportedPhysicalMemoryReservedOnTT = - status.getResourceStatus().getReservedPhysicalMemory(); + long reportedMapSlotMemorySize = + status.getResourceStatus().getMapSlotMemorySizeOnTT(); + long reportedReduceSlotMemorySize = + status.getResourceStatus().getReduceSlotMemorySizeOnTT(); message = "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " - + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = (" - + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", " - + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT - + ")"; + + "mapSlotMemSize, reduceSlotMemorySize) = (" + + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + "," + + mapSlotMemorySize + "," + reduceSlotMemorySize + ")"; message += "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, " - + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = (" + + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = (" + reportedTotalVirtualMemoryOnTT + ", " + reportedTotalPhysicalMemoryOnTT - + ", " - + reportedVirtualMemoryReservedOnTT - + ", " - + reportedPhysicalMemoryReservedOnTT + ")"; + + "," + + reportedMapSlotMemorySize + + "," + + reportedReduceSlotMemorySize + + ")"; LOG.info(message); if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT - || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT - || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) { + || mapSlotMemorySize != reportedMapSlotMemorySize + || reduceSlotMemorySize != reportedReduceSlotMemorySize) { hasPassed = false; } return super.assignTasks(status); @@ -132,7 +129,7 @@ TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); setUpCluster(conf); - runSleepJob(); + runSleepJob(miniMRCluster.createJobConf()); verifyTestResults(); } finally { tearDownCluster(); @@ -149,8 +146,9 @@ JobConf conf = new JobConf(); conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L); conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L); - conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L); - conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L); + conf.setLong("mapSlotMemorySize", 1 * 512L); + conf.setLong("reduceSlotMemorySize", 1 * 1024L); + conf.setClass( TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); @@ -158,15 +156,17 @@ 4 * 1024 * 1024 * 1024L); conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY, 2 * 1024 * 1024 * 1024L); + conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 512L); conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, - 1 * 1024 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY, - 512 * 1024 * 1024L); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L); + try { setUpCluster(conf); - runSleepJob(); + JobConf jobConf = miniMRCluster.createJobConf(); + jobConf.setMemoryForMapTask(1 * 1024L); + jobConf.setMemoryForReduceTask(2 * 1024L); + runSleepJob(jobConf); verifyTestResults(); } finally { tearDownCluster(); @@ -189,17 +189,10 @@ LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin(); conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize()); conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize()); - conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L); - conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, - 1 * 1024 * 1024 * 1024L); - conf.setLong( - TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY, - 512 * 1024 * 1024L); + try { setUpCluster(conf); - runSleepJob(); + runSleepJob(miniMRCluster.createJobConf()); verifyTestResults(); } finally { tearDownCluster(); @@ -208,22 +201,15 @@ private void setUpCluster(JobConf conf) throws Exception { - conf.setClass("mapred.jobtracker.taskScheduler", - TestTTMemoryReporting.FakeTaskScheduler.class, - TaskScheduler.class); - miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = miniDFSCluster.getFileSystem(); - String namenode = fileSys.getUri().toString(); - miniMRCluster = new MiniMRCluster(1, namenode, 3, - null, null, conf); + conf.setClass("mapred.jobtracker.taskScheduler", + TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class); + conf.set("mapred.job.tracker.handler.count", "1"); + miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf); } - private void runSleepJob() throws Exception { - Configuration conf = new Configuration(); - conf.set("mapred.job.tracker", "localhost:" - + miniMRCluster.getJobTrackerPort()); + private void runSleepJob(JobConf conf) throws Exception { String[] args = { "-m", "1", "-r", "1", - "-mt", "1000", "-rt", "1000" }; + "-mt", "10", "-rt", "10" }; ToolRunner.run(conf, new SleepJob(), args); } @@ -235,7 +221,8 @@ } private void tearDownCluster() { - if (miniMRCluster != null) { miniMRCluster.shutdown(); } - if (miniDFSCluster != null) { miniDFSCluster.shutdown(); } + if (miniMRCluster != null) { + miniMRCluster.shutdown(); + } } -} \ No newline at end of file +} Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.java Tue Jun 16 20:54:24 2009 @@ -18,21 +18,24 @@ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.regex.Pattern; import java.util.regex.Matcher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.MemoryCalculatorPlugin; import org.apache.hadoop.util.ProcfsBasedProcessTree; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.TestProcfsBasedProcessTree; import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.fs.FileSystem; import junit.framework.TestCase; @@ -43,18 +46,22 @@ private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class); - private MiniDFSCluster miniDFSCluster; + private static String TEST_ROOT_DIR = new Path(System.getProperty( + "test.build.data", "/tmp")).toString().replace(' ', '+'); + private MiniMRCluster miniMRCluster; private String taskOverLimitPatternString = "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. " + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task."; - private void startCluster(JobConf conf) throws Exception { - miniDFSCluster = new MiniDFSCluster(conf, 1, true, null); - FileSystem fileSys = miniDFSCluster.getFileSystem(); - String namenode = fileSys.getUri().toString(); - miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf); + private void startCluster(JobConf conf) + throws Exception { + conf.set("mapred.job.tracker.handler.count", "1"); + conf.set("mapred.tasktracker.map.tasks.maximum", "1"); + conf.set("mapred.tasktracker.reduce.tasks.maximum", "1"); + conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0"); + miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf); } @Override @@ -62,9 +69,6 @@ if (miniMRCluster != null) { miniMRCluster.shutdown(); } - if (miniDFSCluster != null) { - miniDFSCluster.shutdown(); - } } private int runSleepJob(JobConf conf) throws Exception { @@ -74,15 +78,6 @@ private void runAndCheckSuccessfulJob(JobConf conf) throws IOException { - // Set up job. - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); - Pattern taskOverLimitPattern = Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*")); Matcher mat = null; @@ -148,43 +143,12 @@ return; } - JobConf conf = new JobConf(); // Task-memory management disabled by default. - startCluster(conf); - long PER_TASK_LIMIT = 100L; // Doesn't matter how low. - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - runAndCheckSuccessfulJob(conf); - } - - /** - * Test for verifying that tasks with no limits, with the cumulative usage - * still under TT's limits, succeed. - * - * @throws Exception - */ - public void testTasksWithNoLimits() - throws Exception { - // Run the test only if memory management is enabled - if (!isProcfsBasedTreeAvailable()) { - return; - } - - // Fairly large value for sleepJob to succeed - long ttLimit = 4 * 1024 * 1024 * 1024L; - // Start cluster with proper configuration. - JobConf fConf = new JobConf(); - - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - ttLimit); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit); - fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); - startCluster(fConf); - JobConf conf = new JobConf(); + startCluster(new JobConf()); + long PER_TASK_LIMIT = 1L; // Doesn't matter how low. + JobConf conf = miniMRCluster.createJobConf(); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); runAndCheckSuccessfulJob(conf); } @@ -202,33 +166,25 @@ } // Large so that sleepjob goes through and fits total TT usage - long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L; - long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; + long PER_TASK_LIMIT = 2 * 1024L; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 2 * 1024L); fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); - startCluster(fConf); - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024L); + startCluster(new JobConf()); + + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); runAndCheckSuccessfulJob(conf); - } /** - * Test for verifying that tasks that go beyond limits, though the cumulative - * usage is under TT's limits, get killed. + * Test for verifying that tasks that go beyond limits get killed. * * @throws Exception */ @@ -240,43 +196,32 @@ return; } - long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks. - long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit - // total usage + long PER_TASK_LIMIT = 1L; // Low enough to kill off sleepJob tasks. + Pattern taskOverLimitPattern = Pattern.compile(String.format(taskOverLimitPatternString, String - .valueOf(PER_TASK_LIMIT))); + .valueOf(PER_TASK_LIMIT*1024*1024L))); Matcher mat = null; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); // very small value, so that no task escapes to successful completion. fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300)); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 2 * 1024); + fConf.setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, + 2 * 1024); startCluster(fConf); // Set up job. - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); + conf.setMaxMapAttempts(1); + conf.setMaxReduceAttempts(1); // Start the job. int ret = 0; @@ -334,48 +279,39 @@ } // Large enough for SleepJob Tasks. - long PER_TASK_LIMIT = 100000000000L; - // Very Limited TT. All tasks will be killed. - long TASK_TRACKER_LIMIT = 100L; - Pattern taskOverLimitPattern = - Pattern.compile(String.format(taskOverLimitPatternString, String - .valueOf(PER_TASK_LIMIT))); - Pattern trackerOverLimitPattern = - Pattern - .compile("Killing one of the least progress tasks - .*, as " - + "the cumulative memory usage of all the tasks on the TaskTracker" - + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + "."); - Matcher mat = null; + long PER_TASK_LIMIT = 100 * 1024L; // Start cluster with proper configuration. JobConf fConf = new JobConf(); - fConf.setClass( - TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, - DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class); - fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, - TASK_TRACKER_LIMIT); - fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, - TASK_TRACKER_LIMIT); + fConf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, + 1L); fConf.setLong( - TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1L); + + // Because of the above, the total tt limit is 2mb + long TASK_TRACKER_LIMIT = 2 * 1024 * 1024L; + // very small value, so that no task escapes to successful completion. fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300)); startCluster(fConf); + Pattern taskOverLimitPattern = + Pattern.compile(String.format(taskOverLimitPatternString, String + .valueOf(PER_TASK_LIMIT))); + + Pattern trackerOverLimitPattern = + Pattern + .compile("Killing one of the least progress tasks - .*, as " + + "the cumulative memory usage of all the tasks on the TaskTracker" + + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + "."); + Matcher mat = null; + // Set up job. - JobConf conf = new JobConf(); - conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT); - JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker(); - conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":" - + jt.getTrackerPort()); - NameNode nn = miniDFSCluster.getNameNode(); - conf.set("fs.default.name", "hdfs://" - + nn.getNameNodeAddress().getHostName() + ":" - + nn.getNameNodeAddress().getPort()); + JobConf conf = new JobConf(miniMRCluster.createJobConf()); + conf.setMemoryForMapTask(PER_TASK_LIMIT); + conf.setMemoryForReduceTask(PER_TASK_LIMIT); JobClient jClient = new JobClient(conf); SleepJob sleepJob = new SleepJob(); @@ -385,10 +321,12 @@ job.submit(); boolean TTOverFlowMsgPresent = false; while (true) { - // Set-up tasks are the first to be launched. - TaskReport[] setUpReports = jClient.getSetupTaskReports( - (org.apache.hadoop.mapred.JobID)job.getID()); - for (TaskReport tr : setUpReports) { + List<TaskReport> allTaskReports = new ArrayList<TaskReport>(); + allTaskReports.addAll(Arrays.asList(jClient + .getSetupTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); + allTaskReports.addAll(Arrays.asList(jClient + .getMapTaskReports((org.apache.hadoop.mapred.JobID) job.getID()))); + for (TaskReport tr : allTaskReports) { String[] diag = tr.getDiagnostics(); for (String str : diag) { mat = taskOverLimitPattern.matcher(str); @@ -414,4 +352,90 @@ // Test succeeded, kill the job. job.killJob(); } + + /** + * Test to verify the check for whether a process tree is over limit or not. + * @throws IOException if there was a problem setting up the + * fake procfs directories or files. + */ + public void testProcessTreeLimits() throws IOException { + + // set up a dummy proc file system + File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); + String[] pids = { "100", "200", "300", "400", "500", "600", "700" }; + try { + TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir); + + // create pid dirs. + TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids); + + // create process infos. + TestProcfsBasedProcessTree.ProcessStatInfo[] procs = + new TestProcfsBasedProcessTree.ProcessStatInfo[7]; + + // assume pids 100, 500 are in 1 tree + // 200,300,400 are in another + // 600,700 are in a third + procs[0] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"100", "proc1", "1", "100", "100", "100000"}); + procs[1] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"200", "proc2", "1", "200", "200", "200000"}); + procs[2] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"300", "proc3", "200", "200", "200", "300000"}); + procs[3] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"400", "proc4", "200", "200", "200", "400000"}); + procs[4] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"500", "proc5", "100", "100", "100", "1500000"}); + procs[5] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"600", "proc6", "1", "600", "600", "100000"}); + procs[6] = new TestProcfsBasedProcessTree.ProcessStatInfo( + new String[] {"700", "proc7", "600", "600", "600", "100000"}); + // write stat files. + TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs); + + // vmem limit + long limit = 700000; + + // Create TaskMemoryMonitorThread + TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L, + 5000L); + // create process trees + // tree rooted at 100 is over limit immediately, as it is + // twice over the mem limit. + ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree( + "100", true, 100L, + procfsRootDir.getAbsolutePath()); + pTree.getProcessTree(); + assertTrue("tree rooted at 100 should be over limit " + + "after first iteration.", + test.isProcessTreeOverLimit(pTree, "dummyId", limit)); + + // the tree rooted at 200 is initially below limit. + pTree = new ProcfsBasedProcessTree("200", true, 100L, + procfsRootDir.getAbsolutePath()); + pTree.getProcessTree(); + assertFalse("tree rooted at 200 shouldn't be over limit " + + "after one iteration.", + test.isProcessTreeOverLimit(pTree, "dummyId", limit)); + // second iteration - now the tree has been over limit twice, + // hence it should be declared over limit. + pTree.getProcessTree(); + assertTrue("tree rooted at 200 should be over limit after 2 iterations", + test.isProcessTreeOverLimit(pTree, "dummyId", limit)); + + // the tree rooted at 600 is never over limit. + pTree = new ProcfsBasedProcessTree("600", true, 100L, + procfsRootDir.getAbsolutePath()); + pTree.getProcessTree(); + assertFalse("tree rooted at 600 should never be over limit.", + test.isProcessTreeOverLimit(pTree, "dummyId", limit)); + + // another iteration does not make any difference. + pTree.getProcessTree(); + assertFalse("tree rooted at 600 should never be over limit.", + test.isProcessTreeOverLimit(pTree, "dummyId", limit)); + } finally { + FileUtil.fullyDelete(procfsRootDir); + } + } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jun 16 20:54:24 2009 @@ -548,6 +548,12 @@ // Start a job and return its RunningJob object static RunningJob runJob(JobConf conf, Path inDir, Path outDir) throws IOException { + return runJob(conf, inDir, outDir, conf.getNumMapTasks(), conf.getNumReduceTasks()); + } + + // Start a job and return its RunningJob object + static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, + int numReds) throws IOException { FileSystem fs = FileSystem.get(conf); if (fs.exists(outDir)) { @@ -558,9 +564,11 @@ } String input = "The quick brown fox\n" + "has many silly\n" + "red fox sox\n"; - DataOutputStream file = fs.create(new Path(inDir, "part-0")); - file.writeBytes(input); - file.close(); + for (int i = 0; i < numMaps; ++i) { + DataOutputStream file = fs.create(new Path(inDir, "part-" + i)); + file.writeBytes(input); + file.close(); + } conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(LongWritable.class); @@ -568,8 +576,8 @@ FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); - conf.setNumMapTasks(conf.getNumMapTasks()); - conf.setNumReduceTasks(conf.getNumReduceTasks()); + conf.setNumMapTasks(numMaps); + conf.setNumReduceTasks(numReds); JobClient jobClient = new JobClient(conf); RunningJob job = jobClient.submitJob(conf); Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapred/lib/TestKeyFieldBasedComparator.java Tue Jun 16 20:54:24 2009 @@ -38,12 +38,16 @@ public class TestKeyFieldBasedComparator extends HadoopTestCase { JobConf conf; - String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.1 4444 011 011 234"; - String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.0 4444.1"; + JobConf localConf; + + String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.0 4444.1 011 011 234"; + String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.1 4444"; public TestKeyFieldBasedComparator() throws IOException { super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1); conf = createJobConf(); + localConf = createJobConf(); + localConf.set("map.output.key.field.separator", " "); } public void configure(String keySpec, int expect) throws Exception { Path testdir = new Path("build/test/test.mapred.spill"); @@ -123,9 +127,24 @@ configure("-k2.4,2.4n", 2); configure("-k7,7", 1); configure("-k7,7n", 2); - configure("-k8,8n", 2); - configure("-k9,9n", 1); + configure("-k8,8n", 1); + configure("-k9,9", 2); configure("-k11,11",2); configure("-k10,10",2); + + localTestWithoutMRJob("-k9,9", 1); + } + + byte[] line1_bytes = line1.getBytes(); + byte[] line2_bytes = line2.getBytes(); + + public void localTestWithoutMRJob(String keySpec, int expect) throws Exception { + KeyFieldBasedComparator<Void, Void> keyFieldCmp = new KeyFieldBasedComparator<Void, Void>(); + localConf.setKeyFieldComparatorOptions(keySpec); + keyFieldCmp.configure(localConf); + int result = keyFieldCmp.compare(line1_bytes, 0, line1_bytes.length, + line2_bytes, 0, line2_bytes.length); + if ((expect >= 0 && result < 0) || (expect < 0 && result >= 0)) + fail(); } } Modified: hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Jun 16 20:54:24 2009 @@ -27,6 +27,7 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.MultiFileWordCount; import org.apache.hadoop.examples.SecondarySort; import org.apache.hadoop.examples.WordCount; import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator; @@ -41,6 +42,7 @@ import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ToolRunner; /** * A JUnit test to test min map-reduce cluster with local file system. @@ -88,6 +90,7 @@ Configuration conf = mr.createJobConf(); runWordCount(conf); runSecondarySort(conf); + runMultiFileWordCount(conf); } finally { if (mr != null) { mr.shutdown(); } } @@ -118,6 +121,9 @@ out); Counters ctrs = job.getCounters(); System.out.println("Counters: " + ctrs); + long mapIn = ctrs.findCounter(FileInputFormat.COUNTER_GROUP, + FileInputFormat.BYTES_READ).getValue(); + assertTrue(mapIn != 0); long combineIn = ctrs.findCounter(COUNTER_GROUP, "COMBINE_INPUT_RECORDS").getValue(); long combineOut = ctrs.findCounter(COUNTER_GROUP, @@ -169,5 +175,21 @@ "------------------------------------------------\n" + "10\t20\n10\t25\n10\t30\n", out); } - + + public void runMultiFileWordCount(Configuration conf) throws Exception { + localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true); + localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true); + writeFile("in/part1", "this is a test\nof " + + "multi file word count test\ntest\n"); + writeFile("in/part2", "more test"); + + int ret = ToolRunner.run(conf, new MultiFileWordCount(), + new String[] {TEST_ROOT_DIR + "/in", TEST_ROOT_DIR + "/out"}); + assertTrue("MultiFileWordCount failed", ret == 0); + String out = readFile("out/part-r-00000"); + System.out.println(out); + assertEquals("a\t1\ncount\t1\nfile\t1\nis\t1\n" + + "more\t1\nmulti\t1\nof\t1\ntest\t4\nthis\t1\nword\t1\n", out); + } + } Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/ ------------------------------------------------------------------------------ --- svn:mergeinfo (added) +++ svn:mergeinfo Tue Jun 16 20:54:24 2009 @@ -0,0 +1,2 @@ +/hadoop/core/branches/branch-0.19/mapred/src/webapps/job:713112 +/hadoop/core/trunk/src/webapps/job:776175-784663 Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/jobdetails.jsp Tue Jun 16 20:54:24 2009 @@ -267,6 +267,10 @@ "<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" + flakyTaskTrackers + "</a><br>\n"); } + if (job.getSchedulingInfo() != null) { + out.print("<b>Job Scheduling information: </b>" + + job.getSchedulingInfo().toString() +"\n"); + } out.print("<hr>\n"); out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">"); out.print("<tr><th>Kind</th><th>% Complete</th><th>Num Tasks</th>" + Modified: hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp?rev=785392&r1=785391&r2=785392&view=diff ============================================================================== --- hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp (original) +++ hadoop/core/branches/HADOOP-4687/mapred/src/webapps/job/taskdetails.jsp Tue Jun 16 20:54:24 2009 @@ -126,7 +126,12 @@ <table border=2 cellpadding="5" cellspacing="2"> <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> <% - if (!ts[0].getIsMap() && !isCleanupOrSetup) { + if (ts[0].getIsMap()) { + %> +<td>Map Phase Finished</td> + <% + } + else if(!isCleanupOrSetup) { %> <td>Shuffle Finished</td><td>Sort Finished</td> <% @@ -181,7 +186,12 @@ out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, status .getStartTime(), 0) + "</td>"); - if (!ts[i].getIsMap() && !isCleanupOrSetup) { + if (ts[i].getIsMap()) { + out.print("<td>" + + StringUtils.getFormattedTimeWithDiff(dateFormat, status + .getMapFinishTime(), status.getStartTime()) + "</td>"); + } + else if (!isCleanupOrSetup) { out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, status .getShuffleFinishTime(), status.getStartTime()) + "</td>"); @@ -268,7 +278,7 @@ </center> <% - if (ts[0].getIsMap()) { + if (ts[0].getIsMap() && !isCleanupOrSetup) { %> <h3>Input Split Locations</h3> <table border=2 cellpadding="5" cellspacing="2">
