Repository: tez Updated Branches: refs/heads/TEZ-3334 a6b9009b5 -> 72b3e19c4
TEZ-3621. Optimize the Shuffle Handler content length calculation for keep alive (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/72b3e19c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/72b3e19c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/72b3e19c Branch: refs/heads/TEZ-3334 Commit: 72b3e19c44dd625df46d4ad08c1445e118de541f Parents: a6b9009 Author: Jonathan Eagles <[email protected]> Authored: Mon Feb 13 12:48:36 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Feb 13 12:48:36 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../org/apache/tez/auxservices/ShuffleHandler.java | 15 +++++++-------- .../common/shuffle/orderedgrouped/ShuffleHeader.java | 12 ++++++++++++ 3 files changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/72b3e19c/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index e3b5de5..5d5ee71 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3621. Optimize the Shuffle Handler content length calculation for keep alive TEZ-3620. UnorderedPartitionedKVOutput is missing the shuffle service config in the confKeys set TEZ-3618. Shuffle Handler Loading cache equality tests always results is false TEZ-3612. Tez Shuffle Handler Content length does not match actual http://git-wip-us.apache.org/repos/asf/tez/blob/72b3e19c/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 9f0125b..fa6d888 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -248,7 +248,7 @@ public class ShuffleHandler extends AuxiliaryService { public static final int DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE = 128; boolean connectionKeepAliveEnabled = false; - int connectionKeepAliveTimeOut; + String connectionKeepAliveTimeOut; int mapOutputMetaInfoCacheSize; @Metrics(about="Shuffle output metrics", context="mapred", name="tez") @@ -532,7 +532,7 @@ public class ShuffleHandler extends AuxiliaryService { connectionKeepAliveEnabled = conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED); - connectionKeepAliveTimeOut = + connectionKeepAliveTimeOut = "timeout=" + Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT)); mapOutputMetaInfoCacheSize = @@ -1201,15 +1201,12 @@ public class ShuffleHandler extends AuxiliaryService { if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } - DataOutputBuffer dob = new DataOutputBuffer(); for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce); ShuffleHeader header = new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce); - dob.reset(); - header.write(dob); - contentLength += dob.getLength(); + contentLength += header.writeLength(); contentLength += indexRecord.getPartLength(); } } @@ -1220,8 +1217,10 @@ public class ShuffleHandler extends AuxiliaryService { if (connectionKeepAliveEnabled || keepAliveParam) { response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); - LOG.info("Content Length in shuffle : " + contentLength); + response.setHeader(HttpHeaders.Values.KEEP_ALIVE, connectionKeepAliveTimeOut); + if (LOG.isDebugEnabled()) { + LOG.debug("Content Length in shuffle : " + contentLength); + } } else { if (LOG.isDebugEnabled()) { LOG.debug("Setting connection close header..."); http://git-wip-us.apache.org/repos/asf/tez/blob/72b3e19c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java index 339af57..9f883db 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleHeader.java @@ -85,6 +85,18 @@ public class ShuffleHeader implements Writable { forReduce = WritableUtils.readVInt(in); } + public int writeLength() throws IOException { + int length = 0; + int mapIdLength = Text.encode(mapId).limit(); + length += mapIdLength; + + length += WritableUtils.getVIntSize(mapIdLength); + length += WritableUtils.getVIntSize(compressedLength); + length += WritableUtils.getVIntSize(uncompressedLength); + length += WritableUtils.getVIntSize(forReduce); + + return length; + } public void write(DataOutput out) throws IOException { Text.writeString(out, mapId); WritableUtils.writeVLong(out, compressedLength);
