Repository: tez Updated Branches: refs/heads/branch-0.9 403327b6e -> c04416d86
TEZ-3954. Reduce Tez Shuffle Handler Memory needs for holding TezIndexRecords (Jonathan Eagles via kshukla) (cherry picked from commit 3baab55810ce5477c4048125e2b192bde9ec134d) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c04416d8 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c04416d8 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c04416d8 Branch: refs/heads/branch-0.9 Commit: c04416d86d4492b10e82146d2d429e6b6d3014ca Parents: 403327b Author: Kuhu Shukla <[email protected]> Authored: Fri Jul 6 09:22:44 2018 -0500 Committer: Kuhu Shukla <[email protected]> Committed: Fri Jul 6 09:22:44 2018 -0500 ---------------------------------------------------------------------- .../apache/tez/auxservices/ShuffleHandler.java | 50 ++++++++++++++++---- .../tez/auxservices/TestShuffleHandler.java | 8 ++-- 2 files changed, 45 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c04416d8/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index d48cc01..13a6264 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -1146,7 +1146,7 @@ public class ShuffleHandler extends AuxiliaryService { try { MapOutputInfo info = reduceContext.getInfoMap().get(mapId); if (info == null) { - info = getMapOutputInfo(reduceContext.dagId, mapId, + info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(), reduceContext.getJobId(), reduceContext.getUser()); } @@ -1204,7 +1204,7 @@ public class ShuffleHandler extends AuxiliaryService { } protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - String jobId, + Range reduceRange, String jobId, String user) throws IOException { AttemptPathInfo pathInfo; try { @@ -1233,8 +1233,13 @@ public class ShuffleHandler extends AuxiliaryService { pathInfo.indexPath); } + MapOutputInfo outputInfo; + if (reduceRange.first == reduceRange.last) { + outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord.getIndex(reduceRange.first), reduceRange); + } else { - MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord); + outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord, reduceRange); + } return outputInfo; } @@ -1262,12 +1267,12 @@ public class ShuffleHandler extends AuxiliaryService { int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1); for (String mapId : mapIds) { contentLength += reduceCountVSize; - MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user); + MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, reduceRange, jobId, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { - TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce); + TezIndexRecord indexRecord = outputInfo.getIndex(reduce); ShuffleHeader header = new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce); @@ -1295,12 +1300,37 @@ public class ShuffleHandler extends AuxiliaryService { } class MapOutputInfo { - final Path mapOutputFileName; - final TezSpillRecord spillRecord; + private final Path mapOutputFileName; + private TezSpillRecord spillRecord; + private TezIndexRecord indexRecord; + private final Range reduceRange; + + MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord, Range reduceRange) { + this.mapOutputFileName = mapOutputFileName; + this.indexRecord = indexRecord; + this.reduceRange = reduceRange; + } - MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord) { + MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord, Range reduceRange) { this.mapOutputFileName = mapOutputFileName; this.spillRecord = spillRecord; + this.reduceRange = reduceRange; + } + + TezIndexRecord getIndex(int index) { + if (index < reduceRange.first || index > reduceRange.last) { + throw new IllegalArgumentException("Reduce Index: " + index + " out of range for " + mapOutputFileName); + } + if (spillRecord != null) { + return spillRecord.getIndex(index); + } else { + return indexRecord; + } + } + + public void finish() { + spillRecord = null; + indexRecord = null; } } @@ -1356,7 +1386,7 @@ public class ShuffleHandler extends AuxiliaryService { WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1); ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { - TezIndexRecord index = outputInfo.spillRecord.getIndex(reduce); + TezIndexRecord index = outputInfo.getIndex(reduce); // Records are only valid if they have a non-zero part length if (index.getPartLength() != 0) { if (firstIndex == null) { @@ -1368,6 +1398,8 @@ public class ShuffleHandler extends AuxiliaryService { ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce); DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); + // Free the memory needed to store the spill and index records + outputInfo.finish(); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); } http://git-wip-us.apache.org/repos/asf/tez/blob/c04416d8/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index b9fd0d2..258e061 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -110,7 +110,7 @@ public class TestShuffleHandler { } @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - String jobId, + Range reduceRange, String jobId, String user) throws IOException { // Do nothing. @@ -235,7 +235,7 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - String jobId, + Range reduceRange, String jobId, String user) throws IOException { return null; @@ -345,7 +345,7 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - String jobId, String user) + Range reduceRange, String jobId, String user) throws IOException { return null; } @@ -567,7 +567,7 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - String jobId, + Range reduceRange, String jobId, String user) throws IOException { // Do nothing.
