Author: tomwhite Date: Tue Aug 7 13:41:43 2007 New Revision: 563649 URL: http://svn.apache.org/viewvc?view=rev&rev=563649 Log: HADOOP-1651. Improve progress reporting. Contributed by Devaraj Das.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=563649&r1=563648&r2=563649 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 7 13:41:43 2007 @@ -32,6 +32,11 @@ HADOOP-1463. HDFS report correct usage statistics for disk space used by HDFS. (Hairong Kuang via dhruba) + IMPROVEMENTS + + HADOOP-1651. Improve progress reporting. + (Devaraj Das via tomwhite) + Branch 0.14 (unreleased changes) 1. HADOOP-1197. In Configuration, deprecate getObject() and add Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=563649&r1=563648&r2=563649 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Aug 7 13:41:43 2007 @@ -217,7 +217,7 @@ ) throws IOException; /** The number of milliseconds between progress reports. */ - public static final int PROGRESS_INTERVAL = 1000; + public static final int PROGRESS_INTERVAL = 3000; private transient Progress taskProgress = new Progress(); @@ -230,6 +230,8 @@ * Using AtomicBoolean since we need an atomic read & reset method. */ private AtomicBoolean progressFlag = new AtomicBoolean(false); + /* flag to track whether task is done */ + private AtomicBoolean taskDone = new AtomicBoolean(false); // getters and setters for flag private void setProgressFlag() { progressFlag.set(true); @@ -256,11 +258,20 @@ public void run() { final int MAX_RETRIES = 3; int remainingRetries = MAX_RETRIES; - while (true) { + // get current flag value and reset it as well + boolean sendProgress = resetProgressFlag(); + while (!taskDone.get()) { try { - // get current flag value and reset it as well - boolean sendProgress = resetProgressFlag(); boolean taskFound = true; // whether TT knows about this task + // sleep for a bit + try { + Thread.sleep(PROGRESS_INTERVAL); + } + catch (InterruptedException e) { + LOG.debug(getTaskId() + " Progress/ping thread exiting " + + "since it got interrupted"); + break; + } if (sendProgress) { // we need to send progress update @@ -279,13 +290,8 @@ System.exit(66); } + sendProgress = resetProgressFlag(); remainingRetries = MAX_RETRIES; - // sleep for a bit - try { - Thread.sleep(PROGRESS_INTERVAL); - } - catch (InterruptedException e) { - } } catch (Throwable t) { LOG.info("Communication exception: " + StringUtils.stringifyException(t)); @@ -301,6 +307,7 @@ }, "Comm thread for "+taskId); thread.setDaemon(true); thread.start(); + LOG.debug(getTaskId() + " Progress/ping thread started"); } @@ -338,6 +345,7 @@ public void done(TaskUmbilicalProtocol umbilical) throws IOException { int retries = 10; boolean needProgress = true; + taskDone.set(true); while (true) { try { if (needProgress) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=563649&r1=563648&r2=563649 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 7 13:41:43 2007 @@ -1776,6 +1776,8 @@ JobConf defaultConf = new JobConf(); int port = Integer.parseInt(args[0]); String taskid = args[1]; + //set a very high idle timeout so that the connection is never closed + defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000); TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,