MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed too early. Contributed by Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72d08a0e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72d08a0e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72d08a0e Branch: refs/heads/HDFS-7240 Commit: 72d08a0e41efda635baa985d55d67cb059a7c07c Parents: 2ba6465 Author: Jason Lowe <jl...@apache.org> Authored: Wed Jun 24 15:29:11 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Jun 24 15:29:11 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 4 ++++ .../apache/hadoop/mapreduce/task/reduce/Fetcher.java | 1 + .../mapreduce/task/reduce/IFileWrappedMapOutput.java | 10 ++-------- .../hadoop/mapreduce/task/reduce/LocalFetcher.java | 15 +++++---------- 4 files changed, 12 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5eae44e..6c65032 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -505,6 +505,10 @@ Release 2.8.0 - UNRELEASED multibyte record delimiters well (Vinayakumar B, Rushabh Shah, and Akira AJISAKA via jlowe) + MAPREDUCE-6400. Multiple shuffle transfer fails because input is closed + too early (Brahma Reddy Battula, Akira AJISAKA, and Gera Shegalov via + jlowe) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 1e03387..fb0ac0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -335,6 +335,7 @@ class Fetcher<K,V> extends Thread { try { failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled); } catch (IOException e) { + IOUtils.cleanup(LOG, input); // // Setup connection again if disconnected by NM connection.disconnect(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java index 119db15..6051c34 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java @@ -60,13 +60,7 @@ public abstract class IFileWrappedMapOutput<K, V> extends MapOutput<K, V> { long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - IFileInputStream iFin = - new IFileInputStream(input, compressedLength, conf); - try { - this.doShuffle(host, iFin, compressedLength, - decompressedLength, metrics, reporter); - } finally { - iFin.close(); - } + doShuffle(host, new IFileInputStream(input, compressedLength, conf), + compressedLength, decompressedLength, metrics, reporter); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/72d08a0e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index de2382c..f45742f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.IndexRecord; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapOutputFile; @@ -149,19 +150,13 @@ class LocalFetcher<K,V> extends Fetcher<K, V> { // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); - - inStream = CryptoUtils.wrapIfNecessary(job, inStream); - try { + inStream = CryptoUtils.wrapIfNecessary(job, inStream); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); - mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); + mapOutput.shuffle(LOCALHOST, inStream, compressedLength, + decompressedLength, metrics, reporter); } finally { - try { - inStream.close(); - } catch (IOException ioe) { - LOG.warn("IOException closing inputstream from map output: " - + ioe.toString()); - } + IOUtils.cleanup(LOG, inStream); } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,