Author: acmurthy Date: Thu Oct 18 08:31:09 2007 New Revision: 586006 URL: http://svn.apache.org/viewvc?rev=586006&view=rev Log: Merge -r 586002:586003 from trunk to branch-0.15 to fix HADOOP-2070.
Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original) +++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Thu Oct 18 08:31:09 2007 @@ -315,6 +315,10 @@ very uneven splits for applications like distcp that count on them. (omalley) + HADOOP-2070. Added a flush method to pipes' DownwardProtocol and call + that before waiting for the application to finish to ensure all buffered + data is flushed. (Owen O'Malley via acmurthy) + IMPROVEMENTS HADOOP-1908. Restructure data node code so that block sending and Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/Application.java Thu Oct 18 08:31:09 2007 @@ -51,7 +51,7 @@ private Process process; private Socket clientSocket; private OutputHandler<K2, V2> handler; - private BinaryProtocol<K1, V1, K2, V2> downlink; + private DownwardProtocol<K1, V1> downlink; /** * Start the child process to handle the task for us. @@ -109,6 +109,7 @@ * @throws Throwable */ boolean waitForFinish() throws Throwable { + downlink.flush(); return handler.waitForFinish(); } @@ -121,6 +122,7 @@ LOG.info("Aborting because of " + StringUtils.stringifyException(t)); try { downlink.abort(); + downlink.flush(); } catch (IOException e) { // IGNORE cleanup problems } @@ -141,7 +143,7 @@ void cleanup() throws IOException { serverSocket.close(); try { - downlink.closeConnection(); + downlink.close(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Thu Oct 18 08:31:09 2007 @@ -226,7 +226,7 @@ * @throws IOException * @throws InterruptedException */ - public void closeConnection() throws IOException, InterruptedException { + public void close() throws IOException, InterruptedException { LOG.debug("closing connection"); stream.close(); uplink.closeConnection(); @@ -291,15 +291,18 @@ writeObject(value); } - public void close() throws IOException { + public void endOfInput() throws IOException { WritableUtils.writeVInt(stream, MessageType.CLOSE.code); LOG.debug("Sent close command"); - stream.flush(); } public void abort() throws IOException { WritableUtils.writeVInt(stream, MessageType.ABORT.code); LOG.debug("Sent abort command"); + } + + public void flush() throws IOException { + stream.flush(); } /** Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/DownwardProtocol.java Thu Oct 18 08:31:09 2007 @@ -97,11 +97,21 @@ * input. * @throws IOException */ - void close() throws IOException; + void endOfInput() throws IOException; /** * The task should stop as soon as possible, because something has gone wrong. * @throws IOException */ void abort() throws IOException; + + /** + * Flush the data through any buffers. + */ + void flush() throws IOException; + + /** + * Close the connection. + */ + void close() throws IOException, InterruptedException; } Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java Thu Oct 18 08:31:09 2007 @@ -76,7 +76,7 @@ // map pair to output downlink.mapItem(key, value); } - downlink.close(); + downlink.endOfInput(); } application.waitForFinish(); } catch (Throwable t) { Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java?rev=586006&r1=586005&r2=586006&view=diff ============================================================================== --- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java (original) +++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/pipes/PipesReducer.java Thu Oct 18 08:31:09 2007 @@ -94,8 +94,9 @@ } try { if (isOk) { - application.getDownlink().close(); + application.getDownlink().endOfInput(); } else { + // send the abort to the application and let it clean up application.getDownlink().abort(); } LOG.info("waiting for finish");