Author: cutting Date: Thu May 17 14:57:58 2007 New Revision: 539136 URL: http://svn.apache.org/viewvc?view=rev&rev=539136 Log: Merge -r 539131:539135 from trunk to 0.13 branch. Fixes: HADOOP-1369 and HADOOP-1361.
Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/Text.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/UTF8.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/WritableUtils.java lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=539136&r1=539135&r2=539136 ============================================================================== --- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Thu May 17 14:57:58 2007 @@ -398,6 +398,12 @@ 119. HADOOP-1368. Fix inconsistent synchronization in JobInProgress. (omalley via cutting) +120. HADOOP-1369. Fix inconsistent synchronization in TaskTracker. + (omalley via cutting) + +121. HADOOP-1361. Fix various calls to skipBytes() to check return + value. (Hairong Kuang via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/Text.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/Text.java?view=diff&rev=539136&r1=539135&r2=539136 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/Text.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/Text.java Thu May 17 14:57:58 2007 @@ -230,7 +230,7 @@ /** Skips over one Text in the input. */ public static void skip(DataInput in) throws IOException { int length = WritableUtils.readVInt(in); - in.skipBytes(length); + WritableUtils.skipFully(in, length); } /** serialize Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/UTF8.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/UTF8.java?view=diff&rev=539136&r1=539135&r2=539136 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/UTF8.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/UTF8.java Thu May 17 14:57:58 2007 @@ -110,7 +110,7 @@ /** Skips over one UTF8 in the input. */ public static void skip(DataInput in) throws IOException { int length = in.readUnsignedShort(); - in.skipBytes(length); + WritableUtils.skipFully(in, length); } public void write(DataOutput out) throws IOException { Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/WritableUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/WritableUtils.java?view=diff&rev=539136&r1=539135&r2=539136 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/WritableUtils.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/io/WritableUtils.java Thu May 17 14:57:58 2007 @@ -48,7 +48,9 @@ public static void skipCompressedByteArray(DataInput in) throws IOException { int length = in.readInt(); - if (length != -1) in.skipBytes(length); + if (length != -1) { + skipFully(in, length); + } } public static int writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException { @@ -381,5 +383,24 @@ public static void writeEnum(DataOutput out, Enum enumVal) throws IOException{ Text.writeString(out, enumVal.name()); + } + /** + * Skip <i>len</i> number of bytes in input stream<i>in</i> + * @param in input stream + * @param len number of bytes to skip + * @throws IOException when skipped less number of bytes + */ + public static void skipFully(DataInput in, int len) throws IOException { + int total = 0; + int cur = 0; + + while ((total<len) && ((cur = (int) in.skipBytes(len-total)) > 0)) { + total += cur; + } + + if (total<len) { + throw new IOException("Not able to skip " + len + " bytes, possibly " + + "due to end of input."); + } } } Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=539136&r1=539135&r2=539136 ============================================================================== --- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May 17 14:57:58 2007 @@ -24,7 +24,6 @@ import java.io.PrintStream; import java.net.BindException; import java.net.InetSocketAddress; -import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -37,8 +36,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; -import java.util.Collections; -import java.util.Collection; import javax.servlet.ServletContext; import javax.servlet.ServletException; @@ -803,17 +800,17 @@ // else resend the previous status information. // if (status == null) { - List<TaskStatus> taskReports = - new ArrayList<TaskStatus>(runningTasks.size()); synchronized (this) { + List<TaskStatus> taskReports = + new ArrayList<TaskStatus>(runningTasks.size()); for (TaskInProgress tip: runningTasks.values()) { taskReports.add(tip.createStatus()); } + status = + new TaskTrackerStatus(taskTrackerName, localHostname, + httpPort, taskReports, + failures); } - status = - new TaskTrackerStatus(taskTrackerName, localHostname, - httpPort, taskReports, - failures); } else { LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() + "' with reponseId '" + heartbeatResponseId); @@ -822,14 +819,15 @@ // // Check if we should ask for a new Task // - boolean askForNewTask = false; - if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && - acceptNewTasks) { + boolean askForNewTask; + synchronized (this) { + askForNewTask = (mapTotal < maxCurrentTasks || + reduceTotal < maxCurrentTasks) && + acceptNewTasks; + } + if (askForNewTask) { checkLocalDirs(fConf.getLocalDirs()); - - if (enoughFreeSpace(minSpaceStart)) { - askForNewTask = true; - } + askForNewTask = enoughFreeSpace(minSpaceStart); } // @@ -1453,7 +1451,7 @@ /** * The map output has been lost. */ - public synchronized void mapOutputLost(String failure + private synchronized void mapOutputLost(String failure ) throws IOException { if (runstate == TaskStatus.State.SUCCEEDED) { LOG.info("Reporting output lost:"+task.getTaskId());