Author: cutting Date: Mon Mar 20 11:08:07 2006 New Revision: 387279 URL: http://svn.apache.org/viewcvs?rev=387279&view=rev Log: Fix for HADOOP-86. Errors while reading map output now cause map task to fail and be re-executed.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=387279&r1=387278&r2=387279&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Mon Mar 20 11:08:07 2006 @@ -23,6 +23,8 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.conf.*; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.mapred.TaskTracker.MapOutputServer; /** A local file to be transferred via the [EMAIL PROTECTED] MapOutputProtocol}. */ class MapOutputFile implements Writable, Configurable { @@ -106,16 +108,15 @@ UTF8.writeString(out, reduceTaskId); out.writeInt(partition); - // write the length-prefixed file content to the wire File file = getOutputFile(mapTaskId, partition); - out.writeLong(file.length()); - FSDataInputStream in = null; try { + // write the length-prefixed file content to the wire + out.writeLong(file.length()); in = FileSystem.getNamed("local", this.jobConf).open(file); - } catch (IOException e) { - // log a SEVERE exception in order to cause TaskTracker to exit + } catch (FileNotFoundException e) { TaskTracker.LOG.log(Level.SEVERE, "Can't open map output:" + file, e); + ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId); throw e; } try { @@ -127,8 +128,8 @@ try { l = in.read(buffer); } catch (IOException e) { - // log a SEVERE exception in order to cause TaskTracker to exit TaskTracker.LOG.log(Level.SEVERE,"Can't read map output:" + file, e); + ((MapOutputServer)Server.get()).getTaskTracker().mapOutputLost(mapTaskId); throw e; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=387279&r1=387278&r2=387279&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 20 11:08:07 2006 @@ -70,6 +70,15 @@ private int maxCurrentTasks; + class MapOutputServer extends RPC.Server { + private MapOutputServer(int port, int threads) { + super(TaskTracker.this, fConf, port, threads, false); + } + public TaskTracker getTaskTracker() { + return TaskTracker.this; + } + } + /** * Start with the local machine name, and the default JobTracker */ @@ -127,7 +136,7 @@ } while (true) { try { - this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf); + this.mapOutputServer = new MapOutputServer(mapOutputPort, maxCurrentTasks); this.mapOutputServer.start(); break; } catch (BindException e) { @@ -305,11 +314,6 @@ } } lastHeartbeat = now; - - if (LogFormatter.hasLoggedSevere()) { - LOG.info("Severe problem detected. TaskTracker exiting."); - return STALE_STATE; - } } return 0; @@ -539,6 +543,22 @@ } /** + * The map output has been lost. + */ + public synchronized void mapOutputLost() throws IOException { + if (runstate == TaskStatus.SUCCEEDED) { + LOG.info("Reporting output lost:"+task.getTaskId()); + runstate = TaskStatus.FAILED; // change status to failure + synchronized (TaskTracker.this) { // force into next heartbeat + runningTasks.put(task.getTaskId(), this); + mapTotal++; + } + } else { + LOG.warning("Output already reported lost:"+task.getTaskId()); + } + } + + /** * We no longer need anything from this task. Either the * controlling job is all done and the files have been copied * away, or the task failed and we don't need the remains. @@ -642,6 +662,18 @@ tip.taskFinished(); } else { LOG.warning("Unknown child task finshed: "+taskid+". Ignored."); + } + } + + /** + * A completed map task's output has been lost. + */ + public synchronized void mapOutputLost(String taskid) throws IOException { + TaskInProgress tip = (TaskInProgress) tasks.get(taskid); + if (tip != null) { + tip.mapOutputLost(); + } else { + LOG.warning("Unknown child with bad map output: "+taskid+". Ignored."); } }