Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=696957&r1=696956&r2=696957&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 19 00:31:41 2008 @@ -18,6 +18,9 @@ package org.apache.hadoop.mapred; import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -32,11 +35,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.LinkedHashMap; import java.util.Vector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -62,6 +67,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -184,6 +190,8 @@ private MapEventsFetcherThread mapEventsFetcher; int workerThreads; private CleanupQueue directoryCleanupThread; + volatile JvmManager jvmManager; + private TaskMemoryManagerThread taskMemoryManager; private boolean taskMemoryManagerEnabled = false; private long maxVirtualMemoryForTasks @@ -389,7 +397,7 @@ // Clear out state tables this.tasks.clear(); - this.runningTasks = new TreeMap<TaskAttemptID, TaskInProgress>(); + this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>(); this.runningJobs = new TreeMap<JobID, RunningJob>(); this.mapTotal = 0; this.reduceTotal = 0; @@ -422,6 +430,8 @@ InetSocketAddress socAddr = NetUtils.createSocketAddr(address); String bindAddress = socAddr.getHostName(); int tmpPort = socAddr.getPort(); + + this.jvmManager = new JvmManager(this); // RPC initialization int max = maxCurrentMapTasks > maxCurrentReduceTasks ? @@ -472,6 +482,10 @@ taskMemoryManager.setDaemon(true); taskMemoryManager.start(); } + mapLauncher = new TaskLauncher(maxCurrentMapTasks); + reduceLauncher = new TaskLauncher(maxCurrentReduceTasks); + mapLauncher.start(); + reduceLauncher.start(); this.running = true; } @@ -675,7 +689,6 @@ private void localizeJob(TaskInProgress tip) throws IOException { Path localJarFile = null; Task t = tip.getTask(); - JobID jobId = t.getJobID(); Path jobFile = new Path(t.getJobFile()); // Get sizes of JobFile and JarFile @@ -840,6 +853,14 @@ // Shutdown the fetcher thread this.mapEventsFetcher.interrupt(); + //stop the launchers + mapLauncher.cleanTaskQueue(); + reduceLauncher.cleanTaskQueue(); + this.mapLauncher.interrupt(); + this.reduceLauncher.interrupt(); + + jvmManager.stop(); + // shutdown RPC connections RPC.stopProxy(jobClient); } @@ -1037,7 +1058,7 @@ if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { - startNewTask((LaunchTaskAction) action); + addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { @@ -1130,8 +1151,8 @@ boolean askForNewTask; long localMinSpaceStart; synchronized (this) { - askForNewTask = (mapTotal < maxCurrentMapTasks || - reduceTotal < maxCurrentReduceTasks) && + askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || + status.countReduceTasks() < maxCurrentReduceTasks) && acceptNewTasks; localMinSpaceStart = minSpaceStart; } @@ -1161,6 +1182,8 @@ synchronized (this) { for (TaskStatus taskStatus : status.getTaskReports()) { if (taskStatus.getRunState() != TaskStatus.State.RUNNING && + taskStatus.getRunState() != TaskStatus.State.UNASSIGNED && + taskStatus.getRunState() != TaskStatus.State.INITIALIZED && taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { if (taskStatus.getIsMap()) { mapTotal--; @@ -1192,7 +1215,7 @@ /** * Return the maximum amount of memory available for all tasks on * this tracker - * @return maximum amount of virtual memory in kilobytes + * @return maximum amount of virtual memory */ long getMaxVirtualMemoryForTasks() { return maxVirtualMemoryForTasks; @@ -1208,7 +1231,7 @@ * and the total amount of maximum virtual memory that can be * used by all currently running tasks. * - * @return amount of free virtual memory in kilobytes that can be assured for + * @return amount of free virtual memory that can be assured for * new tasks */ private synchronized long findFreeVirtualMemory() { @@ -1224,9 +1247,9 @@ // still occupied and hence memory of the task should be // accounted in used memory. if ((tip.getRunState() == TaskStatus.State.RUNNING) - || (tip.getRunState() == TaskStatus.State.UNASSIGNED) + || (tip.getRunState() == TaskStatus.State.INITIALIZED) || (tip.getRunState() == TaskStatus.State.COMMIT_PENDING)) { - maxMemoryUsed += getMemoryForTask(tip); + maxMemoryUsed += getMemoryForTask(tip.getJobConf()); } } @@ -1239,11 +1262,11 @@ * If the TIP's job has a configured value for the max memory that is * returned. Else, the default memory that would be assigned for the * task is returned. - * @param tip The TaskInProgress + * @param conf * @return the memory allocated for the TIP. */ - private long getMemoryForTask(TaskInProgress tip) { - long memForTask = tip.getJobConf().getMaxVirtualMemoryForTask(); + public long getMemoryForTask(JobConf conf) { + long memForTask = conf.getMaxVirtualMemoryForTask(); if (memForTask == JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT) { memForTask = this.getDefaultMemoryPerTask(); } @@ -1278,6 +1301,7 @@ long now = System.currentTimeMillis(); for (TaskInProgress tip: runningTasks.values()) { if (tip.getRunState() == TaskStatus.State.RUNNING || + tip.getRunState() == TaskStatus.State.INITIALIZED || tip.getRunState() == TaskStatus.State.COMMIT_PENDING) { // Check the per-job timeout interval for tasks; // an interval of '0' implies it is never timed-out @@ -1497,15 +1521,101 @@ return -1; } } + + private TaskLauncher mapLauncher; + private TaskLauncher reduceLauncher; + + public JvmManager getJvmManagerInstance() { + return jvmManager; + } + + public void addFreeMapSlot() { + mapLauncher.addFreeSlot(); + } - /** - * Start a new task. - * All exceptions are handled locally, so that we don't mess up the - * task tracker. - */ - private void startNewTask(LaunchTaskAction action) { + public void addFreeReduceSlot() { + reduceLauncher.addFreeSlot(); + } + + private void addToTaskQueue(LaunchTaskAction action) { + if (action.getTask().isMapTask()) { + mapLauncher.addToTaskQueue(action); + } else { + reduceLauncher.addToTaskQueue(action); + } + } + + private class TaskLauncher extends Thread { + private IntWritable numFreeSlots; + private final int maxSlots; + private List<TaskInProgress> tasksToLaunch; + + public TaskLauncher(int numSlots) { + this.maxSlots = numSlots; + this.numFreeSlots = new IntWritable(numSlots); + this.tasksToLaunch = new LinkedList<TaskInProgress>(); + setDaemon(true); + setName("TaskLauncher for task"); + } + + public void addToTaskQueue(LaunchTaskAction action) { + synchronized (tasksToLaunch) { + TaskInProgress tip = registerTask(action); + tasksToLaunch.add(tip); + tasksToLaunch.notifyAll(); + } + } + + public void cleanTaskQueue() { + tasksToLaunch.clear(); + } + + public void addFreeSlot() { + synchronized (numFreeSlots) { + numFreeSlots.set(numFreeSlots.get() + 1); + assert (numFreeSlots.get() <= maxSlots); + LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get()); + numFreeSlots.notifyAll(); + } + } + + public void run() { + while (!Thread.interrupted()) { + try { + TaskInProgress tip; + synchronized (tasksToLaunch) { + while (tasksToLaunch.isEmpty()) { + tasksToLaunch.wait(); + } + //get the TIP + tip = tasksToLaunch.remove(0); + LOG.info("Trying to launch : " + tip.getTask().getTaskID()); + } + //wait for a slot to run + synchronized (numFreeSlots) { + while (numFreeSlots.get() == 0) { + numFreeSlots.wait(); + } + LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+ + " and trying to launch "+tip.getTask().getTaskID()); + numFreeSlots.set(numFreeSlots.get() - 1); + assert (numFreeSlots.get() >= 0); + } + + //got a free slot. launch the task + startNewTask(tip); + } catch (InterruptedException e) { + return; // ALL DONE + } catch (Throwable th) { + LOG.error("TaskLauncher error " + + StringUtils.stringifyException(th)); + } + } + } + } + private TaskInProgress registerTask(LaunchTaskAction action) { Task t = action.getTask(); - LOG.info("LaunchTaskAction: " + t.getTaskID()); + LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID()); TaskInProgress tip = new TaskInProgress(t, this.fConf); synchronized (this) { tasks.put(t.getTaskID(), tip); @@ -1517,10 +1627,19 @@ reduceTotal++; } } + return tip; + } + /** + * Start a new task. + * All exceptions are handled locally, so that we don't mess up the + * task tracker. + */ + private void startNewTask(TaskInProgress tip) { try { localizeJob(tip); if (isTaskMemoryManagerEnabled()) { - taskMemoryManager.addTask(t.getTaskID(), getMemoryForTask(tip)); + taskMemoryManager.addTask(tip.getTask().getTaskID(), + getMemoryForTask(tip.getJobConf())); } } catch (Throwable e) { String msg = ("Error initializing " + tip.getTask().getTaskID() + @@ -1691,13 +1810,11 @@ } localJobConf.set("hadoop.net.static.resolutions", str.toString()); } - OutputStream out = localFs.create(localTaskFile); - try { - localJobConf.write(out); - } finally { - out.close(); + if (task.isMapTask()) { + debugCommand = localJobConf.getMapDebugScript(); + } else { + debugCommand = localJobConf.getReduceDebugScript(); } - task.setConf(localJobConf); String keepPattern = localJobConf.getKeepTaskFilesPattern(); if (keepPattern != null) { alwaysKeepTaskFiles = @@ -1705,11 +1822,21 @@ } else { alwaysKeepTaskFiles = false; } - if (task.isMapTask()) { - debugCommand = localJobConf.getMapDebugScript(); - } else { - debugCommand = localJobConf.getReduceDebugScript(); + if (debugCommand != null || localJobConf.getProfileEnabled() || + alwaysKeepTaskFiles) { + //disable jvm reuse + localJobConf.setNumTasksToExecutePerJvm(1); } + if (isTaskMemoryManagerEnabled()) { + localJobConf.setBoolean("task.memory.mgmt.enabled", true); + } + OutputStream out = localFs.create(localTaskFile); + try { + localJobConf.write(out); + } finally { + out.close(); + } + task.setConf(localJobConf); } /** @@ -1717,6 +1844,10 @@ public Task getTask() { return task; } + + public TaskRunner getTaskRunner() { + return runner; + } public synchronized void setJobConf(JobConf lconf){ this.localJobConf = lconf; @@ -1745,10 +1876,8 @@ */ public synchronized void launchTask() throws IOException { localizeTask(task); - this.taskStatus.setRunState(TaskStatus.State.RUNNING); this.runner = task.createRunner(TaskTracker.this); this.runner.start(); - this.taskStatus.setStartTime(System.currentTimeMillis()); } /** @@ -1816,7 +1945,8 @@ this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; - + jvmManager.taskFinished(runner); + runner.signalDone(); LOG.info("Task " + task.getTaskID() + " is done."); LOG.info("reported output size for " + task.getTaskID() + " was " + taskStatus.getOutputSize()); @@ -1857,13 +1987,16 @@ String jobConf = task.getJobFile(); try { // get task's stdout file - taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile + taskStdout = FileUtil.makeShellPath( + TaskLog.getRealTaskLogFileLocation (task.getTaskID(), TaskLog.LogName.STDOUT)); // get task's stderr file - taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile + taskStderr = FileUtil.makeShellPath( + TaskLog.getRealTaskLogFileLocation (task.getTaskID(), TaskLog.LogName.STDERR)); // get task's syslog file - taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile + taskSyslog = FileUtil.makeShellPath( + TaskLog.getRealTaskLogFileLocation (task.getTaskID(), TaskLog.LogName.SYSLOG)); } catch(IOException e){ LOG.warn("Exception finding task's stdout/err/syslog files"); @@ -1882,8 +2015,8 @@ StringUtils.stringifyException(e)); } // Build the command - File stdout = TaskLog.getTaskLogFile(task.getTaskID(), - TaskLog.LogName.DEBUGOUT); + File stdout = TaskLog.getRealTaskLogFileLocation( + task.getTaskID(), TaskLog.LogName.DEBUGOUT); // add pipes program as argument if it exists. String program =""; String executable = Submitter.getExecutable(localJobConf); @@ -1951,6 +2084,13 @@ } } + + synchronized void taskInitialized() { + if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { + //one-way state change to INITIALIZED + this.taskStatus.setRunState(TaskStatus.State.INITIALIZED); + } + } /** @@ -2042,6 +2182,8 @@ // Kill the task if it is still running synchronized(this){ if (getRunState() == TaskStatus.State.RUNNING || + getRunState() == TaskStatus.State.UNASSIGNED || + getRunState() == TaskStatus.State.INITIALIZED || getRunState() == TaskStatus.State.COMMIT_PENDING) { kill(wasFailure); } @@ -2057,12 +2199,12 @@ */ public synchronized void kill(boolean wasFailure) throws IOException { if (taskStatus.getRunState() == TaskStatus.State.RUNNING || + taskStatus.getRunState() == TaskStatus.State.INITIALIZED || taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { wasKilled = true; if (wasFailure) { failures += 1; } - runner.kill(); taskStatus.setRunState((wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED); @@ -2074,6 +2216,16 @@ taskStatus.setRunState(TaskStatus.State.KILLED); } } + if (runner != null) { + runner.kill(); + runner.signalDone(); + } else { + if (task.isMapTask()) { + addFreeMapSlot(); + } else { + addFreeReduceSlot(); + } + } } /** @@ -2112,13 +2264,6 @@ TaskAttemptID taskId = task.getTaskID(); LOG.debug("Cleaning up " + taskId); - // Remove the associated pid-file, if any - if (TaskTracker.this.isTaskMemoryManagerEnabled()) { - Path pidFilePath = taskMemoryManager.getPidFilePath(taskId); - if (pidFilePath != null) { - directoryCleanupThread.addToQueue(pidFilePath); - } - } synchronized (TaskTracker.this) { if (needCleanup) { @@ -2138,13 +2283,28 @@ + task.getJobID() + Path.SEPARATOR + taskId; if (needCleanup) { if (runner != null) { + //cleans up the output directory of the task (where map outputs + //and reduce inputs get stored) runner.close(); } - directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf, - taskDir)); + //We don't delete the workdir + //since some other task (running in the same JVM) + //might be using the dir. The JVM running the tasks would clean + //the workdir per a task in the task process itself. + if (localJobConf.getNumTasksToExecutePerJvm() == 1) { + directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf, + taskDir)); + } + + else { + directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf, + taskDir+"/job.xml")); + } } else { - directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf, - taskDir + Path.SEPARATOR + MRConstants.WORKDIR)); + if (localJobConf.getNumTasksToExecutePerJvm() == 1) { + directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf, + taskDir+"/work")); + } } } catch (Throwable ie) { LOG.info("Error cleaning up task runner: " + @@ -2170,16 +2330,54 @@ // /////////////////////////////////////////////////////////////// // TaskUmbilicalProtocol ///////////////////////////////////////////////////////////////// + /** * Called upon startup by the child process, to fetch Task data. */ - public synchronized Task getTask(TaskAttemptID taskid) throws IOException { - TaskInProgress tip = tasks.get(taskid); - if (tip != null) { - return tip.getTask(); - } else { - return null; + public synchronized JvmTask getTask(JVMId jvmId, TaskAttemptID firstTaskId) + throws IOException { + LOG.debug("JVM with ID : " + jvmId + " asked for a task"); + if (!jvmManager.isJvmKnown(jvmId)) { + LOG.info("Killing unknown JVM " + jvmId); + return new JvmTask(null, true); + } + RunningJob rjob = runningJobs.get(jvmId.getJobId()); + if (rjob == null) { //kill the JVM since the job is dead + jvmManager.killJvm(jvmId); + return new JvmTask(null, true); + } + TaskInProgress t = runningTasks.get(firstTaskId); + //if we can give the JVM the task it is asking for, well and good; + //if not, we give it some other task from the same job (note that some + //other JVM might have run this task while this JVM was init'ing) + if (t == null || t.getStatus().getRunState() != + TaskStatus.State.INITIALIZED) { + boolean isMap = jvmId.isMapJVM(); + synchronized (rjob) { + for (TaskInProgress tip : runningTasks.values()) { + synchronized (tip) { + if (tip.getTask().getJobID().equals(jvmId.getJobId()) && + tip.getRunState() == TaskStatus.State.INITIALIZED + && ((isMap && tip.getTask().isMapTask()) || + (!isMap && !tip.getTask().isMapTask()))) { + t = tip; + } + } + } + } } + //now the task could be null or we could have got a task that already + //ran earlier (the firstTaskId case) + if (t == null || t.getRunState() != TaskStatus.State.INITIALIZED) { + jvmManager.setRunningTaskForJvm(jvmId, null); + return new JvmTask(null, false); + } + t.getStatus().setRunState(TaskStatus.State.RUNNING); + t.getStatus().setStartTime(System.currentTimeMillis()); + jvmManager.setRunningTaskForJvm(jvmId,t.getTaskRunner()); + LOG.info("JVM with ID: " + jvmId + " given task: " + + t.getTask().getTaskID().toString()); + return new JvmTask(t.getTask(), false); } /** @@ -2334,6 +2532,13 @@ taskMemoryManager.removeTask(taskid); } } + + synchronized void taskInitialized(TaskAttemptID taskid) { + TaskInProgress tip = tasks.get(taskid); + if (tip != null) { + tip.taskInitialized(); + } + } /** * A completed map task's output has been lost. @@ -2355,7 +2560,7 @@ private JobID jobid; private Path jobFile; // keep this for later use - Set<TaskInProgress> tasks; + volatile Set<TaskInProgress> tasks; boolean localized; boolean keepJobFiles; FetchStatus f; @@ -2384,61 +2589,6 @@ } } - /** - * The main() for child processes. - */ - public static class Child { - - public static void main(String[] args) throws Throwable { - //LogFactory.showTime(false); - LOG.debug("Child starting"); - - JobConf defaultConf = new JobConf(); - String host = args[0]; - int port = Integer.parseInt(args[1]); - InetSocketAddress address = new InetSocketAddress(host, port); - TaskAttemptID taskid = TaskAttemptID.forName(args[2]); - TaskUmbilicalProtocol umbilical = - (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, - TaskUmbilicalProtocol.versionID, - address, - defaultConf); - - Task task = umbilical.getTask(taskid); - JobConf job = new JobConf(task.getJobFile()); - TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24)); - task.setConf(job); - - defaultConf.addResource(new Path(task.getJobFile())); - - // Initiate Java VM metrics - JvmMetrics.init(task.getPhase().toString(), job.getSessionId()); - - try { - // use job-specified working directory - FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); - task.run(job, umbilical); // run the task - } catch (FSError e) { - LOG.fatal("FSError from child", e); - umbilical.fsError(taskid, e.getMessage()); - } catch (Throwable throwable) { - LOG.warn("Error running child", throwable); - // Report back any failures, for diagnostic purposes - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - throwable.printStackTrace(new PrintStream(baos)); - umbilical.reportDiagnosticInfo(taskid, baos.toString()); - } finally { - RPC.stopProxy(umbilical); - MetricsContext metricsContext = MetricsUtil.getContext("mapred"); - metricsContext.close(); - // Shutting down log4j of the child-vm... - // This assumes that on return from Task.run() - // there is no more logging done. - LogManager.shutdown(); - } - } - } - /** * Get the name for this task tracker. * @return the string like "tracker_mymachine:50010" @@ -2804,9 +2954,13 @@ * Is the TaskMemoryManager Enabled on this system? * @return true if enabled, false otherwise. */ - boolean isTaskMemoryManagerEnabled() { + public boolean isTaskMemoryManagerEnabled() { return taskMemoryManagerEnabled; } + + public TaskMemoryManagerThread getTaskMemoryManager() { + return taskMemoryManager; + } private void setTaskMemoryManagerEnabledFlag() { if (!ProcfsBasedProcessTree.isAvailable()) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=696957&r1=696956&r2=696957&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep 19 00:31:41 2008 @@ -208,7 +208,7 @@ if (ts.getIsMap() && ((state == TaskStatus.State.RUNNING) || (state == TaskStatus.State.UNASSIGNED) || - (state == TaskStatus.State.COMMIT_PENDING))) { + (state == TaskStatus.State.INITIALIZED))) { mapCount++; } } @@ -226,7 +226,7 @@ if ((!ts.getIsMap()) && ((state == TaskStatus.State.RUNNING) || (state == TaskStatus.State.UNASSIGNED) || - (state == TaskStatus.State.COMMIT_PENDING))) { + (state == TaskStatus.State.INITIALIZED))) { reduceCount++; } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=696957&r1=696956&r2=696957&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep 19 00:31:41 2008 @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.mapred.JvmTask; /** Protocol that task child process uses to contact its parent process. The * parent is a daemon which which polls the central master for a new map or @@ -49,12 +50,19 @@ * Version 12 getMapCompletionEvents() now also indicates if the events are * stale or not. Hence the return type is a class that * encapsulates the events and whether to reset events index. + * Version 13 changed the getTask method signature for HADOOP-249 * */ - public static final long versionID = 11L; + public static final long versionID = 13L; - /** Called when a child task process starts, to get its task.*/ - Task getTask(TaskAttemptID taskid) throws IOException; + /** + * Called when a child task process starts, to get its task. + * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it + * @param taskid the first taskid that the JVM runs + * @return Task object + * @throws IOException + */ + JvmTask getTask(JVMId jvmId, TaskAttemptID taskid) throws IOException; /** * Report child's progress to parent. Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=696957&r1=696956&r2=696957&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Fri Sep 19 00:31:41 2008 @@ -72,14 +72,12 @@ // Run Sort-Validator assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0); } - + Configuration conf = new Configuration(); public void testMapReduceSort() throws Exception { MiniDFSCluster dfs = null; MiniMRCluster mr = null; FileSystem fileSys = null; try { - Configuration conf = new Configuration(); - // set io.sort.mb and fsinmemory.size.mb to lower value in test conf.setInt("io.sort.mb", 5); conf.setInt("fs.inmemory.size.mb", 20); @@ -103,5 +101,8 @@ } } } - + public void testMapReduceSortWithJvmReuse() throws Exception { + conf.setInt("mapred.job.reuse.jvm.num.tasks", -1); + testMapReduceSort(); + } }
