Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jan 10 15:59:23 2007 @@ -454,40 +454,52 @@ TaskStatus status, JobTrackerMetrics metrics) { String taskid = status.getTaskId(); + + // Sanity check: is the TIP already complete? if (tip.isComplete()) { LOG.info("Already complete TIP " + tip.getTIPId() + - " has completed task " + taskid); - return; - } else { - LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + - " successfully."); - - String taskTrackerName = status.getTaskTracker(); + " has completed task " + taskid); + + // Just mark this 'task' as complete + tip.completedTask(taskid); - if(status.getIsMap()){ - JobHistory.MapAttempt.logStarted(profile.getJobId(), - tip.getTIPId(), status.getTaskId(), status.getStartTime(), - taskTrackerName); - JobHistory.MapAttempt.logFinished(profile.getJobId(), - tip.getTIPId(), status.getTaskId(), status.getFinishTime(), - taskTrackerName); - JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), - Values.MAP.name(), status.getFinishTime()); - }else{ - JobHistory.ReduceAttempt.logStarted(profile.getJobId(), - tip.getTIPId(), status.getTaskId(), status.getStartTime(), - taskTrackerName); - JobHistory.ReduceAttempt.logFinished(profile.getJobId(), - tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(), - status.getSortFinishTime(), status.getFinishTime(), - taskTrackerName); - JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), - Values.REDUCE.name(), status.getFinishTime()); + // Let the JobTracker cleanup this taskid if the job isn't running + if (this.status.getRunState() != JobStatus.RUNNING) { + jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); } + return; + } + + LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + + " successfully."); + + // Update jobhistory + String taskTrackerName = status.getTaskTracker(); + if(status.getIsMap()){ + JobHistory.MapAttempt.logStarted(profile.getJobId(), + tip.getTIPId(), status.getTaskId(), status.getStartTime(), + taskTrackerName); + JobHistory.MapAttempt.logFinished(profile.getJobId(), + tip.getTIPId(), status.getTaskId(), status.getFinishTime(), + taskTrackerName); + JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), + Values.MAP.name(), status.getFinishTime()); + }else{ + JobHistory.ReduceAttempt.logStarted(profile.getJobId(), + tip.getTIPId(), status.getTaskId(), status.getStartTime(), + taskTrackerName); + JobHistory.ReduceAttempt.logFinished(profile.getJobId(), + tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(), + status.getSortFinishTime(), status.getFinishTime(), + taskTrackerName); + JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), + Values.REDUCE.name(), status.getFinishTime()); } + // Mark the TIP as complete tip.completed(taskid); - // updating the running/finished map/reduce counts + + // Update the running/finished map/reduce counts if (tip.isMapTask()){ runningMapTasks -= 1; finishedMapTasks += 1; @@ -533,6 +545,10 @@ JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks); metrics.completeJob(); + } else if (this.status.getRunState() != JobStatus.RUNNING) { + // The job has been killed/failed, + // JobTracker should cleanup this task + jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); } } @@ -541,6 +557,7 @@ */ public synchronized void kill() { if (status.getRunState() != JobStatus.FAILED) { + LOG.info("Killing job '" + this.status.getJobId() + "'"); this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED); this.finishTime = System.currentTimeMillis(); this.runningMapTasks = 0; @@ -575,7 +592,9 @@ private void failedTask(TaskInProgress tip, String taskid, TaskStatus status, String trackerName, boolean wasRunning, boolean wasComplete) { + // Mark the taskid as a 'failure' tip.failedSubTask(taskid, trackerName); + boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); @@ -622,6 +641,11 @@ } // + // Let the JobTracker know that this task has failed + // + jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid); + + // // Check if we need to kill the job because of too many failures // if (tip.isFailed()) { @@ -633,9 +657,7 @@ System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks); kill(); } - - jobtracker.removeTaskEntry(taskid); - } + } /** * Fail a task with a given reason, but without a status object. @@ -669,6 +691,9 @@ * from the various tables. */ synchronized void garbageCollect() { + // Let the JobTracker know that a job is complete + jobtracker.finalizeJob(this); + try { // Definitely remove the local-disk copy of the job file if (localJobFile != null) {
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jan 10 15:59:23 2007 @@ -47,6 +47,12 @@ static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3; /** + * The maximum no. of 'completed' (successful/failed/killed) + * jobs kept in memory per-user. + */ + static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100; + + /** * Used for formatting the id numbers */ private static NumberFormat idFormat = NumberFormat.getInstance(); @@ -215,36 +221,45 @@ // // Loop through all expired items in the queue // - synchronized (taskTrackers) { + // Need to lock the JobTracker here since we are + // manipulating it's data-structures via + // ExpireTrackers.run -> JobTracker.lostTaskTracker -> + // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt + // Also need to lock JobTracker before locking 'taskTracker' & + // 'trackerExpiryQueue' to prevent deadlock: + // @see [EMAIL PROTECTED] JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} + synchronized (JobTracker.this) { + synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { - long now = System.currentTimeMillis(); - TaskTrackerStatus leastRecent = null; - while ((trackerExpiryQueue.size() > 0) && - ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && - (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { - - // Remove profile from head of queue - trackerExpiryQueue.remove(leastRecent); - String trackerName = leastRecent.getTrackerName(); - - // Figure out if last-seen time should be updated, or if tracker is dead - TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); - // Items might leave the taskTracker set through other means; the - // status stored in 'taskTrackers' might be null, which means the - // tracker has already been destroyed. - if (newProfile != null) { - if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { - // Remove completely - updateTaskTrackerStatus(trackerName, null); - lostTaskTracker(leastRecent.getTrackerName(), - leastRecent.getHost()); - } else { - // Update time by inserting latest profile - trackerExpiryQueue.add(newProfile); - } - } + long now = System.currentTimeMillis(); + TaskTrackerStatus leastRecent = null; + while ((trackerExpiryQueue.size() > 0) && + ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && + (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { + + // Remove profile from head of queue + trackerExpiryQueue.remove(leastRecent); + String trackerName = leastRecent.getTrackerName(); + + // Figure out if last-seen time should be updated, or if tracker is dead + TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); + // Items might leave the taskTracker set through other means; the + // status stored in 'taskTrackers' might be null, which means the + // tracker has already been destroyed. + if (newProfile != null) { + if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) { + // Remove completely + updateTaskTrackerStatus(trackerName, null); + lostTaskTracker(leastRecent.getTrackerName(), + leastRecent.getHost()); + } else { + // Update time by inserting latest profile + trackerExpiryQueue.add(newProfile); + } } + } } + } } } catch (Exception t) { LOG.error("Tracker Expiry Thread got exception: " + @@ -289,10 +304,26 @@ if (job.getStatus().getRunState() != JobStatus.RUNNING && job.getStatus().getRunState() != JobStatus.PREP && (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) { + // Ok, this call to removeTaskEntries + // is dangerous in some very very obscure + // cases; e.g. when job completed, exceeded + // RETIRE_JOB_INTERVAL time-limit and yet + // some task (taskid) wasn't complete! + removeJobTasks(job); + it.remove(); - + synchronized (userToJobsMap) { + ArrayList<JobInProgress> userJobs = + userToJobsMap.get(job.getProfile().getUser()); + synchronized (userJobs) { + userJobs.remove(job); + } + } jobInitQueue.remove(job); jobsByArrival.remove(job); + + LOG.info("Retired job with id: '" + + job.getProfile().getJobId() + "'"); } } } @@ -418,6 +449,9 @@ TreeMap jobs = new TreeMap(); Vector jobsByArrival = new Vector(); + // (user -> list of JobInProgress) + TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap(); + // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) Map<String, TaskInProgress> taskidToTIPMap = new TreeMap(); @@ -427,8 +461,12 @@ // (trackerID->TreeSet of taskids running at that tracker) TreeMap trackerToTaskMap = new TreeMap(); - // (trackerID --> last sent HeartBeatResponseID) - Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap(); + // (trackerID -> TreeSet of completed taskids running at that tracker) + TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap(); + + // (trackerID --> last sent HeartBeatResponse) + Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = + new TreeMap(); // // Watch and expire TaskTracker objects using these structures. @@ -644,18 +682,181 @@ // taskid --> TIP taskidToTIPMap.put(taskid, tip); } + void removeTaskEntry(String taskid) { // taskid --> tracker String tracker = (String) taskidToTrackerMap.remove(taskid); // tracker --> taskid - TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker); - if (trackerSet != null) { - trackerSet.remove(taskid); + if (tracker != null) { + TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker); + if (trackerSet != null) { + trackerSet.remove(taskid); + } } // taskid --> TIP taskidToTIPMap.remove(taskid); + + LOG.debug("Removing task '" + taskid + "'"); + } + + /** + * Mark a 'task' for removal later. + * This function assumes that the JobTracker is locked on entry. + * + * @param taskTracker the tasktracker at which the 'task' was running + * @param taskid completed (success/failure/killed) task + */ + void markCompletedTaskAttempt(String taskTracker, String taskid) { + // tracker --> taskid + TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker); + if (taskset == null) { + taskset = new TreeSet(); + trackerToMarkedTasksMap.put(taskTracker, taskset); + } + taskset.add(taskid); + + LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'"); + } + + /** + * Mark all 'non-running' jobs of the job for pruning. + * This function assumes that the JobTracker is locked on entry. + * + * @param job the completed job + */ + void markCompletedJob(JobInProgress job) { + for (TaskInProgress tip : job.getMapTasks()) { + for (TaskStatus taskStatus : tip.getTaskStatuses()) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + markCompletedTaskAttempt(taskStatus.getTaskTracker(), + taskStatus.getTaskId()); + } + } + } + for (TaskInProgress tip : job.getReduceTasks()) { + for (TaskStatus taskStatus : tip.getTaskStatuses()) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + markCompletedTaskAttempt(taskStatus.getTaskTracker(), + taskStatus.getTaskId()); + } + } + } + } + + /** + * Remove all 'marked' tasks running on a given [EMAIL PROTECTED] TaskTracker} + * from the [EMAIL PROTECTED] JobTracker}'s data-structures. + * This function assumes that the JobTracker is locked on entry. + * + * @param taskTracker tasktracker whose 'non-running' tasks are to be purged + */ + private void removeMarkedTasks(String taskTracker) { + // Purge all the 'marked' tasks which were running at taskTracker + TreeSet<String> markedTaskSet = + (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker); + if (markedTaskSet != null) { + for (String taskid : markedTaskSet) { + removeTaskEntry(taskid); + LOG.info("Removed completed task '" + taskid + "' from '" + + taskTracker + "'"); + } + // Clear + trackerToMarkedTasksMap.remove(taskTracker); + } + } + + /** + * Call [EMAIL PROTECTED] #removeTaskEntry(String)} for each of the + * job's tasks. + * When the JobTracker is retiring the long-completed + * job, either because it has outlived [EMAIL PROTECTED] #RETIRE_JOB_INTERVAL} + * or the limit of [EMAIL PROTECTED] #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs + * has been reached, we can afford to nuke all it's tasks; a little + * unsafe, but practically feasible. + * + * @param job the job about to be 'retired' + */ + synchronized private void removeJobTasks(JobInProgress job) { + for (TaskInProgress tip : job.getMapTasks()) { + for (TaskStatus taskStatus : tip.getTaskStatuses()) { + removeTaskEntry(taskStatus.getTaskId()); + } + } + for (TaskInProgress tip : job.getReduceTasks()) { + for (TaskStatus taskStatus : tip.getTaskStatuses()) { + removeTaskEntry(taskStatus.getTaskId()); + } + } + } + + /** + * Safe clean-up all data structures at the end of the + * job (success/failure/killed). + * Here we also ensure that for a given user we maintain + * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs + * on the JobTracker. + * + * @param job completed job. + */ + synchronized void finalizeJob(JobInProgress job) { + // Mark the 'non-running' tasks for pruning + markCompletedJob(job); + + // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user + // in memory; information about the purged jobs is available via + // JobHistory. + synchronized (jobs) { + synchronized (jobsByArrival) { + synchronized (jobInitQueue) { + String jobUser = job.getProfile().getUser(); + synchronized (userToJobsMap) { + ArrayList<JobInProgress> userJobs = + userToJobsMap.get(jobUser); + synchronized (userJobs) { + while (userJobs.size() > + MAX_COMPLETE_USER_JOBS_IN_MEMORY) { + JobInProgress rjob = userJobs.get(0); + + // Do not delete 'current' + // finished job just yet. + if (rjob == job) { + break; + } + + // Cleanup all datastructures + int rjobRunState = + rjob.getStatus().getRunState(); + if (rjobRunState == JobStatus.SUCCEEDED || + rjobRunState == JobStatus.FAILED) { + // Ok, this call to removeTaskEntries + // is dangerous is some very very obscure + // cases; e.g. when rjob completed, hit + // MAX_COMPLETE_USER_JOBS_IN_MEMORY job + // limit and yet some task (taskid) + // wasn't complete! + removeJobTasks(rjob); + + userJobs.remove(0); + jobs.remove(rjob.getProfile().getJobId()); + jobInitQueue.remove(rjob); + jobsByArrival.remove(rjob); + + LOG.info("Retired job with id: '" + + rjob.getProfile().getJobId() + "'"); + } else { + // Do not remove jobs that aren't complete. + // Stop here, and let the next pass take + // care of purging jobs. + break; + } + } + } + } + } + } + } } /////////////////////////////////////////////////////// @@ -736,26 +937,46 @@ public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { - LOG.debug("Got heartbeat from: " + status.getTrackerName() + + LOG.debug("Got heartbeat from: " + status.getTrackerName() + " (initialContact: " + initialContact + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: " + responseId); // First check if the last heartbeat response got through String trackerName = status.getTrackerName(); - Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName); - - short newResponseId = (short)(responseId + 1); - if (!initialContact && oldResponseId != null && - oldResponseId.shortValue() != responseId) { - newResponseId = oldResponseId.shortValue(); + HeartbeatResponse prevHeartbeatResponse = + trackerToHeartbeatResponseMap.get(trackerName); + + if (initialContact != true) { + // If this isn't the 'initial contact' from the tasktracker, + // there is something seriously wrong if the JobTracker has + // no record of the 'previous heartbeat'; if so, ask the + // tasktracker to re-initialize itself. + if (prevHeartbeatResponse == null) { + LOG.warn("Serious problem, cannot find record of 'previous' " + + "heartbeat for '" + trackerName + + "'; reinitializing the tasktracker"); + return new HeartbeatResponse(responseId, + new TaskTrackerAction[] {new ReinitTrackerAction()}); + + } + + // It is completely safe to ignore a 'duplicate' from a tracker + // since we are guaranteed that the tracker sends the same + // 'heartbeat' when rpcs are lost. + // [EMAIL PROTECTED] TaskTracker.transmitHeartbeat()} + if (prevHeartbeatResponse.getResponseId() != responseId) { + LOG.info("Ignoring 'duplicate' heartbeat from '" + + trackerName + "'"); + return prevHeartbeatResponse; + } } // Process this heartbeat - if (!processHeartbeat(status, initialContact, - (newResponseId != responseId))) { - if (oldResponseId != null) { - trackerToHeartbeatResponseIDMap.remove(trackerName); + short newResponseId = (short)(responseId + 1); + if (!processHeartbeat(status, initialContact)) { + if (prevHeartbeatResponse != null) { + trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, @@ -784,12 +1005,12 @@ response.setActions( actions.toArray(new TaskTrackerAction[actions.size()])); - // Update the trackerToHeartbeatResponseIDMap - if (newResponseId != responseId) { - trackerToHeartbeatResponseIDMap.put(trackerName, - new Short(newResponseId)); - } + // Update the trackerToHeartbeatResponseMap + trackerToHeartbeatResponseMap.put(trackerName, response); + // Done processing the hearbeat, now remove 'marked' tasks + removeMarkedTasks(trackerName); + return response; } @@ -824,12 +1045,9 @@ * Process incoming heartbeat messages from the task trackers. */ private synchronized boolean processHeartbeat( - TaskTrackerStatus trackerStatus, - boolean initialContact, boolean updateStatusTimestamp) { + TaskTrackerStatus trackerStatus, boolean initialContact) { String trackerName = trackerStatus.getTrackerName(); - if (initialContact || updateStatusTimestamp) { - trackerStatus.setLastSeen(System.currentTimeMillis()); - } + trackerStatus.setLastSeen(System.currentTimeMillis()); synchronized (taskTrackers) { synchronized (trackerExpiryQueue) { @@ -857,7 +1075,7 @@ } updateTaskStatuses(trackerStatus); - //LOG.info("Got heartbeat from "+trackerName); + return true; } @@ -1028,7 +1246,6 @@ killList.add(new KillTaskAction(killTaskId)); LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId); } else { - //killTasksList.add(new KillJobAction(taskId)); String killJobId = tip.getJob().getStatus().getJobId(); killJobIds.add(killJobId); } @@ -1051,14 +1268,28 @@ * map task outputs. */ public synchronized MapOutputLocation[] - locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) { - ArrayList result = new ArrayList(mapTasksNeeded.length); + locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) + throws IOException { + // Check to make sure that the job hasn't 'completed'. JobInProgress job = getJob(jobId); + if (job.status.getRunState() != JobStatus.RUNNING) { + return new MapOutputLocation[0]; + } + + ArrayList result = new ArrayList(mapTasksNeeded.length); for (int i = 0; i < mapTasksNeeded.length; i++) { TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]); if (status != null) { String trackerId = (String) taskidToTrackerMap.get(status.getTaskId()); + // Safety check, if we can't find the taskid in + // taskidToTrackerMap and job isn't 'running', then just + // return an empty array + if (trackerId == null && + job.status.getRunState() != JobStatus.RUNNING) { + return new MapOutputLocation[0]; + } + TaskTrackerStatus tracker; synchronized (taskTrackers) { tracker = (TaskTrackerStatus) taskTrackers.get(trackerId); @@ -1108,10 +1339,22 @@ synchronized (jobs) { synchronized (jobsByArrival) { synchronized (jobInitQueue) { - jobs.put(job.getProfile().getJobId(), job); - jobsByArrival.add(job); - jobInitQueue.add(job); - jobInitQueue.notifyAll(); + synchronized (userToJobsMap) { + jobs.put(job.getProfile().getJobId(), job); + String jobUser = job.getProfile().getUser(); + if (!userToJobsMap.containsKey(jobUser)) { + userToJobsMap.put(jobUser, + new ArrayList<JobInProgress>()); + } + ArrayList<JobInProgress> userJobs = + userToJobsMap.get(jobUser); + synchronized (userJobs) { + userJobs.add(job); + } + jobsByArrival.add(job); + jobInitQueue.add(job); + jobInitQueue.notifyAll(); + } } } } @@ -1271,8 +1514,7 @@ * jobs that might be affected. */ void updateTaskStatuses(TaskTrackerStatus status) { - for (Iterator it = status.taskReports(); it.hasNext(); ) { - TaskStatus report = (TaskStatus) it.next(); + for (TaskStatus report : status.getTaskReports()) { report.setTaskTracker(status.getTrackerName()); String taskId = report.getTaskId(); TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); @@ -1310,8 +1552,16 @@ TaskStatus.Phase.MAP, hostname, trackerName, myMetrics); } + } else if (!tip.isMapTask() && tip.isComplete()) { + // Completed 'reduce' task, not failed; + // only removed from data-structures. + markCompletedTaskAttempt(trackerName, taskId); } } + + // Purge 'marked' tasks, needs to be done + // here to prevent hanging references! + removeMarkedTasks(trackerName); } } Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jan 10 15:59:23 2007 @@ -202,7 +202,7 @@ //spawn a thread to give merge progress heartbeats Thread sortProgress = new Thread() { public void run() { - LOG.info("Started thread: " + getName()); + LOG.debug("Started thread: " + getName()); while (true) { try { reportProgress(umbilical); @@ -467,26 +467,24 @@ { Path [] filename = new Path[numSpills]; Path [] indexFileName = new Path[numSpills]; - FSDataInputStream in[] = new FSDataInputStream[numSpills]; - FSDataInputStream indexIn[] = new FSDataInputStream[numSpills]; for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(getTaskId(), i); - in[i] = localFs.open(filename[i]); indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i); - indexIn[i] = localFs.open(indexFileName[i]); } //create a sorter object as we need access to the SegmentDescriptor //class and merge methods Sorter sorter = new Sorter(localFs, keyClass, valClass, job); - sorter.setFactor(numSpills); for (int parts = 0; parts < partitions; parts++){ List<SegmentDescriptor> segmentList = new ArrayList(numSpills); for(int i = 0; i < numSpills; i++) { - long segmentOffset = indexIn[i].readLong(); - long segmentLength = indexIn[i].readLong(); + FSDataInputStream indexIn = localFs.open(indexFileName[i]); + indexIn.seek(parts * 16); + long segmentOffset = indexIn.readLong(); + long segmentLength = indexIn.readLong(); + indexIn.close(); SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset, segmentLength, filename[i]); s.preserveInput(true); @@ -513,8 +511,8 @@ finalIndexOut.close(); //cleanup for(int i = 0; i < numSpills; i++) { - in[i].close(); localFs.delete(filename[i]); - indexIn[i].close(); localFs.delete(indexFileName[i]); + localFs.delete(filename[i]); + localFs.delete(indexFileName[i]); } } } Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jan 10 15:59:23 2007 @@ -57,7 +57,6 @@ private int partition; private JobTracker jobtracker; private String id; - private String totalTaskIds[]; private JobInProgress job; // Status of the TIP @@ -70,7 +69,13 @@ private int completes = 0; private boolean failed = false; private boolean killed = false; - private TreeSet usableTaskIds = new TreeSet(); + + // The 'unique' prefix for taskids of this tip + String taskIdPrefix; + + // The 'next' usable taskid of this tip + int nextTaskId = 0; + // Map from task Id -> TaskTracker Id, contains tasks that are // currently runnings private TreeMap<String, String> activeTasks = new TreeMap(); @@ -139,13 +144,8 @@ void init(String jobUniqueString) { this.startTime = System.currentTimeMillis(); this.runSpeculative = conf.getSpeculativeExecution(); - String uniqueString = makeUniqueString(jobUniqueString); - this.id = "tip_" + uniqueString; - this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES]; - for (int i = 0; i < totalTaskIds.length; i++) { - totalTaskIds[i] = "task_" + uniqueString + "_" + i; - usableTaskIds.add(totalTaskIds[i]); - } + this.taskIdPrefix = makeUniqueString(jobUniqueString); + this.id = "tip_" + this.taskIdPrefix; } //////////////////////////////////// @@ -180,11 +180,19 @@ } /** + * Is this tip complete? + * + * @return <code>true</code> if the tip is complete, else <code>false</code> */ public boolean isComplete() { return (completes > 0); } + /** + * Is the given taskid in this tip complete? + * + * @param taskid taskid of attempt to check for completion + * @return <code>true</code> if taskid is complete, else <code>false</code> */ public boolean isComplete(String taskid) { TaskStatus status = (TaskStatus) taskStatuses.get(taskid); @@ -194,7 +202,11 @@ return ((completes > 0) && (status.getRunState() == TaskStatus.State.SUCCEEDED)); } + /** + * Is the tip a failure? + * + * @return <code>true</code> if tip has failed, else <code>false</code> */ public boolean isFailed() { return failed; @@ -293,6 +305,17 @@ TaskStatus.State oldState = oldStatus.getRunState(); TaskStatus.State newState = status.getRunState(); + // We should never recieve a duplicate success/failure/killed + // status update for the same taskid! This is a safety check, + // and is addressed better at the TaskTracker to ensure this. + // @see [EMAIL PROTECTED] TaskTracker.transmitHeartbeat()} + if ((newState != TaskStatus.State.RUNNING) && + (oldState == newState)) { + LOG.warn("Recieved duplicate status update of '" + newState + + "' for '" + taskid + "' of TIP '" + getTIPId() + "'"); + return false; + } + // The task is not allowed to move from completed back to running. // We have seen out of order status messagesmoving tasks from complete // to running. This is a spot fix, but it should be addressed more @@ -346,14 +369,29 @@ /** * Indicate that one of the taskids in this TaskInProgress - * has successfully completed! + * has successfully completed. + * + * However this may not be the first subtask in this + * TaskInProgress to be completed and hence we might not want to + * manipulate the TaskInProgress to note that it is 'complete' just-as-yet. */ - public void completed(String taskid) { + void completedTask(String taskid) { LOG.info("Task '" + taskid + "' has completed."); TaskStatus status = (TaskStatus) taskStatuses.get(taskid); status.setRunState(TaskStatus.State.SUCCEEDED); activeTasks.remove(taskid); - + } + + /** + * Indicate that one of the taskids in this TaskInProgress + * has successfully completed! + */ + public void completed(String taskid) { + // + // Record that this taskid is complete + // + completedTask(taskid); + // // Now that the TIP is complete, the other speculative // subtasks will be closed when the owning tasktracker @@ -470,8 +508,17 @@ execStartTime = System.currentTimeMillis(); } - String taskid = (String) usableTaskIds.first(); - usableTaskIds.remove(taskid); + // Create the 'taskid' + String taskid = null; + if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) { + taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId); + ++nextTaskId; + } else { + LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) + + " attempts for the tip '" + getTIPId() + "'"); + return null; + } + String jobId = job.getProfile().getJobId(); if (isMapTask()) { Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Jan 10 15:59:23 2007 @@ -204,17 +204,18 @@ // Add classpath. vargs.add("-classpath"); vargs.add(classPath.toString()); - // Add main class and its arguments - vargs.add(TaskTracker.Child.class.getName()); // main of Child - vargs.add(tracker.taskReportPort + ""); // pass umbilical port - vargs.add(t.getTaskId()); // pass task identifier - + // Add java.library.path; necessary for native-hadoop libraries String libraryPath = System.getProperty("java.library.path"); if (libraryPath != null) { vargs.add("-Djava.library.path=" + libraryPath); } - + + // Add main class and its arguments + vargs.add(TaskTracker.Child.class.getName()); // main of Child + vargs.add(tracker.taskReportPort + ""); // pass umbilical port + vargs.add(t.getTaskId()); // pass task identifier + // Run java runChild((String[])vargs.toArray(new String[0]), workDir); } catch (FSError e) { Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jan 10 15:59:23 2007 @@ -74,6 +74,16 @@ // last heartbeat response recieved short heartbeatResponseId = -1; + /* + * This is the last 'status' report sent by this tracker to the JobTracker. + * + * If the rpc call succeeds, this 'status' is cleared-out by this tracker; + * indicating that a 'fresh' status report be generated; in the event the + * rpc calls fails for whatever reason, the previous status report is sent + * again. + */ + TaskTrackerStatus status = null; + StatusHttpServer server = null; boolean shuttingDown = false; @@ -249,6 +259,7 @@ this.mapTotal = 0; this.reduceTotal = 0; this.acceptNewTasks = true; + this.status = null; this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L); this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L); @@ -535,20 +546,27 @@ * @throws IOException */ private HeartbeatResponse transmitHeartBeat() throws IOException { + // + // Check if the last heartbeat got through... + // if so then build the heartbeat information for the JobTracker; + // else resend the previous status information. // - // Build the heartbeat information for the JobTracker - // - List<TaskStatus> taskReports = - new ArrayList<TaskStatus>(runningTasks.size()); - synchronized (this) { - for (TaskInProgress tip: runningTasks.values()) { - taskReports.add(tip.createStatus()); + if (status == null) { + List<TaskStatus> taskReports = + new ArrayList<TaskStatus>(runningTasks.size()); + synchronized (this) { + for (TaskInProgress tip: runningTasks.values()) { + taskReports.add(tip.createStatus()); + } } + status = + new TaskTrackerStatus(taskTrackerName, localHostname, + httpPort, taskReports, + failures); + } else { + LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + + "' with reponseId '" + heartbeatResponseId); } - TaskTrackerStatus status = - new TaskTrackerStatus(taskTrackerName, localHostname, - httpPort, taskReports, - failures); // // Check if we should ask for a new Task @@ -569,10 +587,14 @@ HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, askForNewTask, heartbeatResponseId); + + // + // The heartbeat got through successfully! + // heartbeatResponseId = heartbeatResponse.getResponseId(); synchronized (this) { - for (TaskStatus taskStatus : taskReports) { + for (TaskStatus taskStatus : status.getTaskReports()) { if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { if (taskStatus.getIsMap()) { mapTotal--; @@ -584,6 +606,10 @@ } } } + + // Force a rebuild of 'status' on the next iteration + status = null; + return heartbeatResponse; } Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Jan 10 15:59:23 2007 @@ -98,11 +98,24 @@ * All current tasks at the TaskTracker. * * Tasks are tracked by a TaskStatus object. + * + * @deprecated use [EMAIL PROTECTED] #getTaskReports()} instead */ public Iterator taskReports() { return taskReports.iterator(); } + /** + * Get the current tasks at the TaskTracker. + * Tasks are tracked by a [EMAIL PROTECTED] TaskStatus} object. + * + * @return a list of [EMAIL PROTECTED] TaskStatus} representing + * the current tasks at the TaskTracker. + */ + public List<TaskStatus> getTaskReports() { + return taskReports; + } + /** * Return the current MapTask count */ Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java Wed Jan 10 15:59:23 2007 @@ -44,6 +44,7 @@ } catch (Throwable t) { // Ignore failure to load LOG.debug("Failed to load native-hadoop with error: " + t); + LOG.debug("java.library.path=" + System.getProperty("java.library.path")); } if (!nativeCodeLoaded) { Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java (original) +++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java Wed Jan 10 15:59:23 2007 @@ -106,14 +106,18 @@ } mainClassName = mainClassName.replaceAll("/", "."); - final File workDir = File.createTempFile("hadoop-unjar","", - new File( new Configuration().get("hadoop.tmp.dir")) ); + File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); + tmpDir.mkdirs(); + if (!tmpDir.isDirectory()) { + System.err.println("Mkdirs failed to create " + tmpDir); + System.exit(-1); + } + final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir ); workDir.delete(); - if (!workDir.mkdirs()) { - if (!workDir.isDirectory()) { - System.err.println("Mkdirs failed to create " + workDir.toString()); - System.exit(-1); - } + workDir.mkdirs(); + if (!workDir.isDirectory()) { + System.err.println("Mkdirs failed to create " + workDir); + System.exit(-1); } Runtime.getRuntime().addShutdownHook(new Thread() { Modified: lucene/hadoop/branches/branch-0.10/src/native/Makefile.am URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/Makefile.am?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/native/Makefile.am (original) +++ lucene/hadoop/branches/branch-0.10/src/native/Makefile.am Wed Jan 10 15:59:23 2007 @@ -36,7 +36,7 @@ export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z]) # List the sub-directories here -SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib +SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib # The following export is needed to build libhadoop.so in the 'lib' directory export SUBDIRS Modified: lucene/hadoop/branches/branch-0.10/src/native/Makefile.in URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/Makefile.in?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/native/Makefile.in (original) +++ lucene/hadoop/branches/branch-0.10/src/native/Makefile.in Wed Jan 10 15:59:23 2007 @@ -207,7 +207,7 @@ target_alias = @target_alias@ # List the sub-directories here -SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib +SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib all: config.h $(MAKE) $(AM_MAKEFLAGS) all-recursive Modified: lucene/hadoop/branches/branch-0.10/src/native/NEWS URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/NEWS?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/native/NEWS (original) +++ lucene/hadoop/branches/branch-0.10/src/native/NEWS Wed Jan 10 15:59:23 2007 @@ -1,3 +1,5 @@ 2006-10-05 Arun C Murthy <[EMAIL PROTECTED]> * Initial version of libhadoop released +2007-01-03 Arun C Murthy <[EMAIL PROTECTED]> + * Added support for lzo compression library Modified: lucene/hadoop/branches/branch-0.10/src/native/config.h.in URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/config.h.in?view=diff&rev=495049&r1=495048&r2=495049 ============================================================================== --- lucene/hadoop/branches/branch-0.10/src/native/config.h.in (original) +++ lucene/hadoop/branches/branch-0.10/src/native/config.h.in Wed Jan 10 15:59:23 2007 @@ -1,5 +1,8 @@ /* config.h.in. Generated from configure.ac by autoheader. */ +/* The 'actual' dynamic-library for '-llzo2' */ +#undef HADOOP_LZO_LIBRARY + /* The 'actual' dynamic-library for '-lz' */ #undef HADOOP_ZLIB_LIBRARY @@ -18,8 +21,41 @@ /* Define to 1 if you have the `jvm' library (-ljvm). */ #undef HAVE_LIBJVM +/* Define to 1 if you have the `lzo2' library (-llzo2). */ +#undef HAVE_LIBLZO2 + /* Define to 1 if you have the `z' library (-lz). */ #undef HAVE_LIBZ + +/* Define to 1 if you have the <lzo/lzo1a.h> header file. */ +#undef HAVE_LZO_LZO1A_H + +/* Define to 1 if you have the <lzo/lzo1b.h> header file. */ +#undef HAVE_LZO_LZO1B_H + +/* Define to 1 if you have the <lzo/lzo1c.h> header file. */ +#undef HAVE_LZO_LZO1C_H + +/* Define to 1 if you have the <lzo/lzo1f.h> header file. */ +#undef HAVE_LZO_LZO1F_H + +/* Define to 1 if you have the <lzo/lzo1x.h> header file. */ +#undef HAVE_LZO_LZO1X_H + +/* Define to 1 if you have the <lzo/lzo1y.h> header file. */ +#undef HAVE_LZO_LZO1Y_H + +/* Define to 1 if you have the <lzo/lzo1z.h> header file. */ +#undef HAVE_LZO_LZO1Z_H + +/* Define to 1 if you have the <lzo/lzo1.h> header file. */ +#undef HAVE_LZO_LZO1_H + +/* Define to 1 if you have the <lzo/lzo2a.h> header file. */ +#undef HAVE_LZO_LZO2A_H + +/* Define to 1 if you have the <lzo/lzo_asm.h> header file. */ +#undef HAVE_LZO_LZO_ASM_H /* Define to 1 if you have the <memory.h> header file. */ #undef HAVE_MEMORY_H