Repository: tez Updated Branches: refs/heads/TEZ-3334 c08eddf01 -> 1a746f7e1
TEZ-3612. Tez Shuffle Handler Content length does not match actual (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1a746f7e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1a746f7e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1a746f7e Branch: refs/heads/TEZ-3334 Commit: 1a746f7e1ab3bb22d0be7e492ab8402734242fe9 Parents: c08eddf Author: Jonathan Eagles <[email protected]> Authored: Thu Feb 9 18:12:57 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Thu Feb 9 18:12:57 2017 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../apache/tez/auxservices/ShuffleHandler.java | 35 ++++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 025f53d..7f0e1ee 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3612. Tez Shuffle Handler Content length does not match actual TEZ-3608. Fetcher can hang if copyMapOutput/fetchInputs returns early TEZ-3606. Fix debug log for empty partitions to the expanded partitionId in the Composite case TEZ-3604. Remove the compositeInputAttemptIdentifier from remaining list upon fetch completion in the Ordered case http://git-wip-us.apache.org/repos/asf/tez/blob/1a746f7e/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index 91d42d8..85b781f 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -1182,7 +1182,21 @@ public class ShuffleHandler extends AuxiliaryService { throws IOException { long contentLength = 0; + // Content-Length only needs calculated for keep-alive keep alive + if (connectionKeepAliveEnabled || keepAliveParam) { + contentLength = getContentLength(mapIds, jobId, dagId, user, reduceRange, mapOutputInfoMap); + } + + // Now set the response headers. + setResponseHeaders(response, keepAliveParam, contentLength); + } + + long getContentLength(List<String> mapIds, String jobId, String dagId, String user, Range reduceRange, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException { + long contentLength = 0; + // Reduce count is written once per mapId + int reduceCountVSize = WritableUtils.getVIntSize(reduceRange.getLast() - reduceRange.getFirst() + 1); for (String mapId : mapIds) { + contentLength += reduceCountVSize; MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); @@ -1199,25 +1213,20 @@ public class ShuffleHandler extends AuxiliaryService { contentLength += indexRecord.getPartLength(); } } - - // Now set the response headers. - setResponseHeaders(response, keepAliveParam, contentLength); + return contentLength; } - protected void setResponseHeaders(HttpResponse response, - boolean keepAliveParam, long contentLength) { - if (!connectionKeepAliveEnabled && !keepAliveParam) { + protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) { + if (connectionKeepAliveEnabled || keepAliveParam) { + response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); + response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut); + LOG.info("Content Length in shuffle : " + contentLength); + } else { if (LOG.isDebugEnabled()) { LOG.debug("Setting connection close header..."); } response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE); - } else { - response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, - String.valueOf(contentLength)); - response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" - + connectionKeepAliveTimeOut); - LOG.info("Content Length in shuffle : " + contentLength); } }
