Author: cutting Date: Wed Nov 22 14:49:29 2006 New Revision: 478358 URL: http://svn.apache.org/viewvc?view=rev&rev=478358 Log: HADOOP-741. Fix some issues with speculative execution. Contributed by Sanjay.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=478358&r1=478357&r2=478358 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Nov 22 14:49:29 2006 @@ -328,8 +328,7 @@ return null; } ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost()); - double avgProgress = status.mapProgress() / maps.length; - int target = findNewTask(tts, clusterSize, avgProgress, + int target = findNewTask(tts, clusterSize, status.mapProgress(), maps, mapCache); if (target == -1) { return null; @@ -357,8 +356,7 @@ return null; } - double avgProgress = status.reduceProgress() ; - int target = findNewTask(tts, clusterSize, avgProgress, + int target = findNewTask(tts, clusterSize, status.reduceProgress() , reduces, null); if (target == -1) { return null; @@ -441,7 +439,6 @@ task.hasSpeculativeTask(avgProgress) && ! task.hasRunOnMachine(taskTracker)) { specTarget = i; - break ; } } } @@ -696,8 +693,8 @@ // Delete temp dfs dirs created if any, like in case of // speculative exn of reduces. - // String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; - // fs.delete(new Path(tempDir)); + String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; + fs.delete(new Path(tempDir)); } catch (IOException e) { LOG.warn("Error cleaning up "+profile.getJobId()+": "+e); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=478358&r1=478357&r2=478358 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Nov 22 14:49:29 2006 @@ -42,7 +42,6 @@ static final int MAX_TASK_FAILURES = 4; static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; - static final int MAX_CONCURRENT_TASKS = 2; private static NumberFormat idFormat = NumberFormat.getInstance(); static { idFormat.setMinimumIntegerDigits(6); @@ -445,21 +444,14 @@ // REMIND - mjc - these constants should be examined // in more depth eventually... // - if (isMapTask() && - activeTasks.size() <= MAX_TASK_EXECS && + + if( activeTasks.size() <= MAX_TASK_EXECS && runSpeculative && (averageProgress - progress >= SPECULATIVE_GAP) && - (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { - return true; - }else{ - //Note: validate criteria for speculative reduce execution - if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) && - (averageProgress - progress >= SPECULATIVE_GAP) && - completes <= 0 && - (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) { - return true ; - } - } + (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) + && completes == 0) { + return true; + } return false; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=478358&r1=478357&r2=478358 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Nov 22 14:49:29 2006 @@ -1026,6 +1026,17 @@ } if (keepJobFiles) return; + + // Delete temp directory in case any task used PhasedFileSystem. + try{ + String systemDir = task.getConf().get("mapred.system.dir"); + String taskTempDir = systemDir + "/" + + task.getJobId() + "/" + task.getTipId(); + fs.delete(new Path(taskTempDir)) ; + }catch(IOException e){ + LOG.warn("Error in deleting reduce temporary output",e); + } + // delete the job diretory for this task // since the job is done/failed this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + @@ -1051,26 +1062,6 @@ runstate = TaskStatus.State.FAILED; } else { runstate = TaskStatus.State.KILLED; - } - } - - // the temporary file names in speculative exn are generated in - // the launched JVM, and we dont talk to it when killing so cleanup - // should happen here. find the task id and delete the temp directory - // for the task. only for killed speculative reduce instances - - // Note: it would be better to couple this with delete localfiles - // which is in conf currently, it doenst belong there. - - if( !task.isMapTask() && - this.defaultJobConf.getSpeculativeExecution() ){ - try{ - String systemDir = task.getConf().get("mapred.system.dir"); - String taskTempDir = systemDir + "/" + - task.getJobId() + "/" + task.getTipId(); - fs.delete(new Path(taskTempDir)) ; - }catch(IOException e){ - LOG.warn("Error in deleting reduce temporary output",e); } } }