Author: cutting Date: Thu Dec 7 12:38:17 2006 New Revision: 483651 URL: http://svn.apache.org/viewvc?view=rev&rev=483651 Log: HADOOP-639. Restructure InterTrackerProtocol to make task accounting more reliable.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483651&r1=483650&r2=483651 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 12:38:17 2006 @@ -30,6 +30,9 @@ 8. HADOOP-676. Improved exceptions and error messages for common job input specification errors. (Sanjay Dahiya via cutting) + 9. HADOOP-639. Restructure InterTrackerProtocol to make task + accounting more reliable. (Arun C Murthy via cutting) + Release 0.9.1 - 2006-12-06 Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * The response sent by the [EMAIL PROTECTED] JobTracker} to the hearbeat sent + * periodically by the [EMAIL PROTECTED] TaskTracker} + * + * @author Arun C Murthy + */ +class HeartbeatResponse implements Writable, Configurable { + Configuration conf = null; + short responseId; + TaskTrackerAction[] actions; + + HeartbeatResponse() {} + + HeartbeatResponse(short responseId, TaskTrackerAction[] actions) { + this.responseId = responseId; + this.actions = actions; + } + + public void setResponseId(short responseId) { + this.responseId = responseId; + } + + public short getResponseId() { + return responseId; + } + + public void setActions(TaskTrackerAction[] actions) { + this.actions = actions; + } + + public TaskTrackerAction[] getActions() { + return actions; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return conf; + } + + public void write(DataOutput out) throws IOException { + out.writeShort(responseId); + if (actions == null) { + WritableUtils.writeVInt(out, 0); + } else { + WritableUtils.writeVInt(out, actions.length); + for (TaskTrackerAction action : actions) { + WritableUtils.writeEnum(out, action.getActionId()); + action.write(out); + } + } + //ObjectWritable.writeObject(out, actions, actions.getClass(), conf); + } + + public void readFields(DataInput in) throws IOException { + this.responseId = in.readShort(); + int length = WritableUtils.readVInt(in); + if (length > 0) { + actions = new TaskTrackerAction[length]; + for (int i=0; i < length; ++i) { + TaskTrackerAction.ActionType actionType = + WritableUtils.readEnum(in, TaskTrackerAction.ActionType.class); + actions[i] = TaskTrackerAction.createAction(actionType); + actions[i].readFields(in); + } + } else { + actions = null; + } + //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf); + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=483651&r1=483650&r2=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Dec 7 12:38:17 2006 @@ -27,31 +27,40 @@ * The JobTracker is the Server, which implements this protocol. */ interface InterTrackerProtocol extends VersionedProtocol { - // version 2 introduced to replace TaskStatus.State with an enum - public static final long versionID = 2L; + /** + * version 3 introduced to replace + * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with + * [EMAIL PROTECTED] #heartbeat(TaskTrackerStatus, boolean, boolean, short)} + */ + public static final long versionID = 3L; public final static int TRACKERS_OK = 0; public final static int UNKNOWN_TASKTRACKER = 1; - /** - * Called regularly by the task tracker to update the status of its tasks - * within the job tracker. JobTracker responds with a code that tells the - * TaskTracker whether all is well. - * - * TaskTracker must also indicate whether this is the first interaction - * (since state refresh) + /** + * Called regularly by the [EMAIL PROTECTED] TaskTracker} to update the status of its + * tasks within the job tracker. [EMAIL PROTECTED] JobTracker} responds with a + * [EMAIL PROTECTED] HeartbeatResponse} that directs the + * [EMAIL PROTECTED] TaskTracker} to undertake a series of 'actions' + * (see [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTrackerAction.ActionType}). + * + * [EMAIL PROTECTED] TaskTracker} must also indicate whether this is the first + * interaction (since state refresh) and acknowledge the last response + * it recieved from the [EMAIL PROTECTED] JobTracker} + * + * @param status the status update + * @param initialContact <code>true</code> if this is first interaction since + * 'refresh', <code>false</code> otherwise. + * @param acceptNewTasks <code>true</code> if the [EMAIL PROTECTED] TaskTracker} is + * ready to accept new tasks to run. + * @param responseId the last responseId successfully acted upon by the + * [EMAIL PROTECTED] TaskTracker}. + * @return a [EMAIL PROTECTED] org.apache.hadoop.mapred.HeartbeatResponse} with + * fresh instructions. */ - int emitHeartbeat(TaskTrackerStatus status, - boolean initialContact) throws IOException; - - /** Called to get new tasks from from the job tracker for this tracker.*/ - Task pollForNewTask(String trackerName) throws IOException; - - /** Called to find which tasks that have been run by this tracker should now - * be closed because their job is complete. This is used to, e.g., - * notify a map task that its output is no longer needed and may - * be removed. */ - String[] pollForTaskWithClosedJob(String trackerName) throws IOException; + HeartbeatResponse heartbeat(TaskTrackerStatus status, + boolean initialContact, boolean acceptNewTasks, short responseId) + throws IOException; /** Called by a reduce task to find which map tasks are completed. * Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=483651&r1=483650&r2=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Dec 7 12:38:17 2006 @@ -427,6 +427,9 @@ // (trackerID->TreeSet of taskids running at that tracker) TreeMap trackerToTaskMap = new TreeMap(); + // (trackerID --> last sent HeartBeatResponseID) + Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap(); + // // Watch and expire TaskTracker objects using these structures. // We can map from Name->TaskTrackerStatus, or we can expire by time. @@ -723,6 +726,74 @@ //////////////////////////////////////////////////// /** + * The periodic heartbeat mechanism between the [EMAIL PROTECTED] TaskTracker} and + * the [EMAIL PROTECTED] JobTracker}. + * + * The [EMAIL PROTECTED] JobTracker} processes the status information sent by the + * [EMAIL PROTECTED] TaskTracker} and responds with instructions to start/stop + * tasks or jobs, and also 'reset' instructions during contingencies. + */ + public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, + boolean initialContact, boolean acceptNewTasks, short responseId) + throws IOException { + LOG.debug("Got heartbeat from: " + status.getTrackerName() + + " (initialContact: " + initialContact + + " acceptNewTasks: " + acceptNewTasks + ")" + + " with responseId: " + responseId); + + // First check if the last heartbeat response got through + String trackerName = status.getTrackerName(); + Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName); + + short newResponseId = (short)(responseId + 1); + if (!initialContact && oldResponseId != null && + oldResponseId.shortValue() != responseId) { + newResponseId = oldResponseId.shortValue(); + } + + // Process this heartbeat + if (!processHeartbeat(status, initialContact, + (newResponseId != responseId))) { + if (oldResponseId != null) { + trackerToHeartbeatResponseIDMap.remove(trackerName); + } + + return new HeartbeatResponse(newResponseId, + new TaskTrackerAction[] {new ReinitTrackerAction()}); + } + + // Initialize the response to be sent for the heartbeat + HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); + List<TaskTrackerAction> actions = new ArrayList(); + + // Check for new tasks to be executed on the tasktracker + if (acceptNewTasks) { + Task task = getNewTaskForTaskTracker(trackerName); + if (task != null) { + LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId()); + actions.add(new LaunchTaskAction(task)); + } + } + + // Check for tasks to be killed + List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); + if (killTasksList != null) { + actions.addAll(killTasksList); + } + + response.setActions( + actions.toArray(new TaskTrackerAction[actions.size()])); + + // Update the trackerToHeartbeatResponseIDMap + if (newResponseId != responseId) { + trackerToHeartbeatResponseIDMap.put(trackerName, + new Short(newResponseId)); + } + + return response; + } + + /** * Update the last recorded status for the given task tracker. * It assumes that the taskTrackers are locked on entry. * @author Owen O'Malley @@ -752,16 +823,21 @@ /** * Process incoming heartbeat messages from the task trackers. */ - public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) { + private synchronized boolean processHeartbeat( + TaskTrackerStatus trackerStatus, + boolean initialContact, boolean updateStatusTimestamp) { String trackerName = trackerStatus.getTrackerName(); - trackerStatus.setLastSeen(System.currentTimeMillis()); + if (initialContact || updateStatusTimestamp) { + trackerStatus.setLastSeen(System.currentTimeMillis()); + } synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { boolean seenBefore = updateTaskTrackerStatus(trackerName, trackerStatus); if (initialContact) { - // If it's first contact, then clear out any state hanging around + // If it's first contact, then clear out + // any state hanging around if (seenBefore) { lostTaskTracker(trackerName, trackerStatus.getHost()); } @@ -770,7 +846,7 @@ if (!seenBefore) { LOG.warn("Status from unknown Tracker : " + trackerName); taskTrackers.remove(trackerName); - return InterTrackerProtocol.UNKNOWN_TASKTRACKER; + return false; } } @@ -782,18 +858,17 @@ updateTaskStatuses(trackerStatus); //LOG.info("Got heartbeat from "+trackerName); - return InterTrackerProtocol.TRACKERS_OK; + return true; } /** - * A tracker wants to know if there's a Task to run. Returns - * a task we'd like the TaskTracker to execute right now. + * Returns a task we'd like the TaskTracker to execute right now. * * Eventually this function should compute load on the various TaskTrackers, * and incorporate knowledge of DFS file placement. But for right now, it * just grabs a single item out of the pending task list and hands it back. */ - public synchronized Task pollForNewTask(String taskTracker) { + private synchronized Task getNewTaskForTaskTracker(String taskTracker) { // // Compute average map and reduce task numbers across pool // @@ -936,23 +1011,36 @@ * A tracker wants to know if any of its Tasks have been * closed (because the job completed, whether successfully or not) */ - public synchronized String[] pollForTaskWithClosedJob(String taskTracker) { - TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker); + private synchronized List getTasksToKill(String taskTracker) { + Set<String> taskIds = (TreeSet) trackerToTaskMap.get(taskTracker); if (taskIds != null) { - ArrayList list = new ArrayList(); - for (Iterator it = taskIds.iterator(); it.hasNext(); ) { - String taskId = (String) it.next(); - TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); - if (tip.shouldCloseForClosedJob(taskId)) { + List<TaskTrackerAction> killList = new ArrayList(); + Set<String> killJobIds = new TreeSet(); + for (String killTaskId : taskIds ) { + TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId); + if (tip.shouldCloseForClosedJob(killTaskId)) { // // This is how the JobTracker ends a task at the TaskTracker. // It may be successfully completed, or may be killed in // mid-execution. // - list.add(taskId); + if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) { + killList.add(new KillTaskAction(killTaskId)); + LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId); + } else { + //killTasksList.add(new KillJobAction(taskId)); + String killJobId = tip.getJob().getStatus().getJobId(); + killJobIds.add(killJobId); + } } } - return (String[]) list.toArray(new String[list.size()]); + + for (String killJobId : killJobIds) { + killList.add(new KillJobAction(killJobId)); + LOG.debug(taskTracker + " -> KillJobAction: " + killJobId); + } + + return killList; } return null; } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +/** + * Represents a directive from the [EMAIL PROTECTED] org.apache.hadoop.mapred.JobTracker} + * to the [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTracker} to kill the task of + * a job and cleanup resources. + * + * @author Arun C Murthy + */ +class KillJobAction extends TaskTrackerAction { + String jobId; + + public KillJobAction() { + super(ActionType.KILL_JOB); + } + + public KillJobAction(String taskId) { + super(ActionType.KILL_JOB); + this.jobId = taskId; + } + + public String getJobId() { + return jobId; + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, jobId); + } + + public void readFields(DataInput in) throws IOException { + jobId = Text.readString(in); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; + +/** + * Represents a directive from the [EMAIL PROTECTED] org.apache.hadoop.mapred.JobTracker} + * to the [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTracker} to kill a task. + * + * @author Arun C Murthy + */ +class KillTaskAction extends TaskTrackerAction { + String taskId; + + public KillTaskAction() { + super(ActionType.KILL_TASK); + } + + public KillTaskAction(String taskId) { + super(ActionType.KILL_TASK); + this.taskId = taskId; + } + + public String getTaskId() { + return taskId; + } + + public void write(DataOutput out) throws IOException { + Text.writeString(out, taskId); + } + + public void readFields(DataInput in) throws IOException { + taskId = Text.readString(in); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the [EMAIL PROTECTED] org.apache.hadoop.mapred.JobTracker} + * to the [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTracker} to launch a new task. + * + * @author Arun C Murthy + */ +class LaunchTaskAction extends TaskTrackerAction { + private Task task; + + public LaunchTaskAction() { + super(ActionType.LAUNCH_TASK); + } + + public LaunchTaskAction(Task task) { + super(ActionType.LAUNCH_TASK); + this.task = task; + } + + public Task getTask() { + return task; + } + + public void write(DataOutput out) throws IOException { + out.writeBoolean(task.isMapTask()); + task.write(out); + } + + public void readFields(DataInput in) throws IOException { + boolean isMapTask = in.readBoolean(); + if (isMapTask) { + task = new MapTask(); + } else { + task = new ReduceTask(); + } + task.readFields(in); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Represents a directive from the [EMAIL PROTECTED] org.apache.hadoop.mapred.JobTracker} + * to the [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTracker} to reinitialize itself. + * + * @author Arun C Murthy + */ +class ReinitTrackerAction extends TaskTrackerAction { + + public ReinitTrackerAction() { + super(ActionType.REINIT_TRACKER); + } + + public void write(DataOutput out) throws IOException {} + + public void readFields(DataInput in) throws IOException {} + +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=483651&r1=483650&r2=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Dec 7 12:38:17 2006 @@ -228,7 +228,12 @@ (job.getStatus().getRunState() != JobStatus.RUNNING)) { tasksReportedClosed.add(taskid); return true; - } else { + } else if( !isMapTask() && isComplete() && + ! tasksReportedClosed.contains(taskid) ){ + tasksReportedClosed.add(taskid); + return true; + } + else { return false; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=483651&r1=483650&r2=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec 7 12:38:17 2006 @@ -68,6 +68,9 @@ Server taskReportServer = null; InterTrackerProtocol jobClient; + + // last heartbeat response recieved + short heartbeatResponseId = -1; StatusHttpServer server = null; @@ -187,7 +190,7 @@ } } } - + static String getCacheSubdir() { return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR; } @@ -458,15 +461,23 @@ } } - if (!transmitHeartBeat()) { + // Send the heartbeat and process the jobtracker's directives + HeartbeatResponse heartbeatResponse = transmitHeartBeat(); + TaskTrackerAction[] actions = heartbeatResponse.getActions(); + LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + + heartbeatResponse.getResponseId() + " and " + + ((actions != null) ? actions.length : 0) + " actions"); + + if (reinitTaskTracker(actions)) { return State.STALE; } + lastHeartbeat = now; justStarted = false; - checkForNewTasks(); + checkAndStartNewTasks(actions); markUnresponsiveTasks(); - closeCompletedTasks(); + closeCompletedTasks(actions); killOverflowingTasks(); //we've cleaned up, resume normal operation @@ -498,56 +509,94 @@ * @return false if the tracker was unknown * @throws IOException */ - private boolean transmitHeartBeat() throws IOException { + private HeartbeatResponse transmitHeartBeat() throws IOException { // // Build the heartbeat information for the JobTracker // - List<TaskStatus> taskReports = new ArrayList(runningTasks.size()); + List<TaskStatus> taskReports = + new ArrayList<TaskStatus>(runningTasks.size()); synchronized (this) { - for (TaskInProgress tip: runningTasks.values()) { - taskReports.add(tip.createStatus()); - } + for (TaskInProgress tip: runningTasks.values()) { + taskReports.add(tip.createStatus()); + } } TaskTrackerStatus status = new TaskTrackerStatus(taskTrackerName, localHostname, - httpPort, taskReports, - failures); - + httpPort, taskReports, + failures); + + // + // Check if we should ask for a new Task + // + boolean askForNewTask = false; + if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && + acceptNewTasks) { + checkLocalDirs(fConf.getLocalDirs()); + + if (enoughFreeSpace(minSpaceStart)) { + askForNewTask = true; + } + } + // // Xmit the heartbeat // + HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, + justStarted, askForNewTask, + heartbeatResponseId); + heartbeatResponseId = heartbeatResponse.getResponseId(); - int resultCode = jobClient.emitHeartbeat(status, justStarted); synchronized (this) { - for (TaskStatus taskStatus: taskReports) { - if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { - if (taskStatus.getIsMap()) { - mapTotal--; - } else { - reduceTotal--; - } - myMetrics.completeTask(); - runningTasks.remove(taskStatus.getTaskId()); + for (TaskStatus taskStatus : taskReports) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + if (taskStatus.getIsMap()) { + mapTotal--; + } else { + reduceTotal--; } + myMetrics.completeTask(); + runningTasks.remove(taskStatus.getTaskId()); + } } } - return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER; + return heartbeatResponse; } /** + * Check if the jobtracker directed a 'reset' of the tasktracker. + * + * @param actions the directives of the jobtracker for the tasktracker. + * @return <code>true</code> if tasktracker is to be reset, + * <code>false</code> otherwise. + */ + private boolean reinitTaskTracker(TaskTrackerAction[] actions) { + if (actions != null) { + for (TaskTrackerAction action : actions) { + if (action.getActionId() == + TaskTrackerAction.ActionType.REINIT_TRACKER) { + LOG.info("Recieved RenitTrackerAction from JobTracker"); + return true; + } + } + } + return false; + } + + /** * Check to see if there are any new tasks that we should run. * @throws IOException */ - private void checkForNewTasks() throws IOException { - // - // Check if we should ask for a new Task - // - if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && - acceptNewTasks) { - checkLocalDirs(fConf.getLocalDirs()); - - if (enoughFreeSpace(minSpaceStart)) { - Task t = jobClient.pollForNewTask(taskTrackerName); + private void checkAndStartNewTasks(TaskTrackerAction[] actions) + throws IOException { + if (actions == null) { + return; + } + + for (TaskTrackerAction action : actions) { + if (action.getActionId() == + TaskTrackerAction.ActionType.LAUNCH_TASK) { + Task t = ((LaunchTaskAction)(action)).getTask(); + LOG.info("LaunchTaskAction: " + t.getTaskId()); if (t != null) { startNewTask(t); } @@ -580,24 +629,73 @@ * Ask the JobTracker if there are any tasks that we should clean up, * either because we don't need them any more or because the job is done. */ - private void closeCompletedTasks() throws IOException { - String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName); - if (toCloseIds != null) { - synchronized (this) { - for (int i = 0; i < toCloseIds.length; i++) { - TaskInProgress tip = tasks.get(toCloseIds[i]); - if (tip != null) { - // remove the task from running jobs, removing the job if - // it is the last task - removeTaskFromJob(tip.getTask().getJobId(), tip); - tasksToCleanup.put(tip); + private void closeCompletedTasks(TaskTrackerAction[] actions) + throws IOException { + if (actions == null) { + return; + } + + for (TaskTrackerAction action : actions) { + TaskTrackerAction.ActionType actionType = action.getActionId(); + + if (actionType == TaskTrackerAction.ActionType.KILL_JOB) { + String jobId = ((KillJobAction)action).getJobId(); + LOG.info("Received 'KillJobAction' for job: " + jobId); + synchronized (runningJobs) { + RunningJob rjob = runningJobs.get(jobId); + if (rjob == null) { + LOG.warn("Unknown job " + jobId + " being deleted."); } else { - LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]); + synchronized (rjob) { + int noJobTasks = rjob.tasks.size(); + int taskCtr = 0; + + // Add this tips of this job to queue of tasks to be purged + for (TaskInProgress tip : rjob.tasks) { + // Purge the job files for the last element in rjob.tasks + if (++taskCtr == noJobTasks) { + tip.setPurgeJobFiles(true); + } + + tasksToCleanup.put(tip); + } + + // Remove this job + rjob.tasks.clear(); + runningJobs.remove(jobId); + } } } + } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) { + String taskId = ((KillTaskAction)action).getTaskId(); + LOG.info("Received KillTaskAction for task: " + taskId); + purgeTask(tasks.get(taskId), false); } } } + + /** + * Remove the tip and update all relevant state. + * + * @param tip [EMAIL PROTECTED] TaskInProgress} to be removed. + * @param purgeJobFiles <code>true</code> if the job files are to be + * purged, <code>false</code> otherwise. + */ + private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) { + if (tip != null) { + LOG.info("About to purge task: " + tip.getTask().getTaskId()); + + // Cleanup the job files? + tip.setPurgeJobFiles(purgeJobFiles); + + // Remove the task from running jobs, + // removing the job if it's the last task + removeTaskFromJob(tip.getTask().getJobId(), tip); + + // Add this tip to queue of tasks to be purged + tasksToCleanup.put(tip); + } + } /** Check if we're dangerously low on disk space * If so, kill jobs to free up space and make sure @@ -829,6 +927,9 @@ private boolean alwaysKeepTaskFiles; private TaskStatus taskStatus ; private boolean keepJobFiles; + + /** Cleanup the job files when the job is complete (done/failed) */ + private boolean purgeJobFiles = false; /** */ @@ -893,6 +994,10 @@ keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles(); } + public void setPurgeJobFiles(boolean purgeJobFiles) { + this.purgeJobFiles = purgeJobFiles; + } + /** */ public synchronized TaskStatus createStatus() { @@ -1046,10 +1151,12 @@ LOG.warn("Error in deleting reduce temporary output",e); } - // delete the job diretory for this task - // since the job is done/failed - this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + - JOBCACHE + Path.SEPARATOR + task.getJobId()); + // Delete the job directory for this + // task if the job is done/failed + if (purgeJobFiles) { + this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + + JOBCACHE + Path.SEPARATOR + task.getJobId()); + } } /** Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java?view=auto&rev=483651 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Dec 7 12:38:17 2006 @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * A generic directive from the [EMAIL PROTECTED] org.apache.hadoop.mapred.JobTracker} + * to the [EMAIL PROTECTED] org.apache.hadoop.mapred.TaskTracker} to take some 'action'. + * + * @author Arun C Murthy + */ +abstract class TaskTrackerAction implements Writable { + + /** + * Ennumeration of various 'actions' that the [EMAIL PROTECTED] JobTracker} + * directs the [EMAIL PROTECTED] TaskTracker} to perform periodically. + * + * @author Arun C Murthy + */ + public static enum ActionType { + /** Launch a new task. */ + LAUNCH_TASK, + + /** Kill a task. */ + KILL_TASK, + + /** Kill any tasks of this job and cleanup. */ + KILL_JOB, + + /** Reinitialize the tasktracker. */ + REINIT_TRACKER + }; + + /** + * A factory-method to create objects of given [EMAIL PROTECTED] ActionType}. + * @param actionType the [EMAIL PROTECTED] ActionType} of object to create. + * @return an object of [EMAIL PROTECTED] ActionType}. + */ + public static TaskTrackerAction createAction(ActionType actionType) { + TaskTrackerAction action = null; + + switch (actionType) { + case LAUNCH_TASK: + { + action = new LaunchTaskAction(); + } + break; + case KILL_TASK: + { + action = new KillTaskAction(); + } + break; + case KILL_JOB: + { + action = new KillJobAction(); + } + break; + case REINIT_TRACKER: + { + action = new ReinitTrackerAction(); + } + break; + } + + return action; + } + + private ActionType actionType; + + protected TaskTrackerAction(ActionType actionType) { + this.actionType = actionType; + } + + /** + * Return the [EMAIL PROTECTED] ActionType}. + * @return the [EMAIL PROTECTED] ActionType}. + */ + ActionType getActionId() { + return actionType; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeEnum(out, actionType); + } + + public void readFields(DataInput in) throws IOException { + actionType = WritableUtils.readEnum(in, ActionType.class); + } +}