TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler (jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe6746d7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe6746d7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe6746d7 Branch: refs/heads/TEZ-3334 Commit: fe6746d7882510369090dce28e2e725565a73aa1 Parents: 25643aa Author: Jonathan Eagles <[email protected]> Authored: Tue Dec 6 11:30:51 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Tue Dec 6 11:30:51 2016 -0600 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../org/apache/tez/auxservices/IndexCache.java | 51 ++- .../apache/tez/auxservices/ShuffleHandler.java | 158 +++++--- .../tez/auxservices/TestShuffleHandler.java | 52 ++- .../common/CompositeInputAttemptIdentifier.java | 68 ++++ .../library/common/shuffle/FetchResult.java | 12 +- .../runtime/library/common/shuffle/Fetcher.java | 357 ++++++++++++------- .../library/common/shuffle/InputHost.java | 63 +++- .../library/common/shuffle/ShuffleUtils.java | 6 +- .../impl/ShuffleInputEventHandlerImpl.java | 85 +++-- .../common/shuffle/impl/ShuffleManager.java | 21 +- .../orderedgrouped/FetcherOrderedGrouped.java | 292 ++++++++------- .../common/shuffle/orderedgrouped/MapHost.java | 8 +- .../common/shuffle/orderedgrouped/Shuffle.java | 3 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 77 +++- .../orderedgrouped/ShuffleScheduler.java | 24 +- .../runtime/library/input/UnorderedKVInput.java | 4 +- .../library/common/shuffle/TestFetcher.java | 26 +- .../impl/TestShuffleInputEventHandlerImpl.java | 35 +- .../common/shuffle/impl/TestShuffleManager.java | 2 +- .../shuffle/orderedgrouped/TestFetcher.java | 36 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 29 +- .../orderedgrouped/TestShuffleScheduler.java | 135 +++---- 23 files changed, 999 insertions(+), 546 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index f93ec6e..3383e50 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3361. Fetch Multiple Partitions from the Shuffle Handler TEZ-3360. Tez Custom Shuffle Handler Documentation TEZ-3411. TestShuffleHandler#testSendMapCount should not used hard coded ShuffleHandler port TEZ-3412. Modify ShuffleHandler to use Constants.DAG_PREFIX and fix AttemptPathIdentifier#toString() http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java index 532187e..247144c 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/IndexCache.java @@ -46,6 +46,45 @@ class IndexCache { } /** + * This method gets the spill record for the given mapId. + * It reads the index file into cache if it is not already present. + * @param mapId + * @param fileName The file to read the index information from if it is not + * already present in the cache + * @param expectedIndexOwner The expected owner of the index file + * @return The spill record for this map + * @throws IOException + */ + public TezSpillRecord getSpillRecord(String mapId, Path fileName, String expectedIndexOwner) + throws IOException { + + IndexInformation info = cache.get(mapId); + + if (info == null) { + info = readIndexFileToCache(fileName, mapId, expectedIndexOwner); + } else { + synchronized(info) { + while (isUnderConstruction(info)) { + try { + info.wait(); + } catch (InterruptedException e) { + throw new IOException("Interrupted waiting for construction", e); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + } + } + + if (info.mapSpillRecord.size() == 0) { + throw new IOException("Invalid request " + + " Map Id = " + mapId + " Index Info Length = " + info.mapSpillRecord.size()); + } + return info.mapSpillRecord; + } + + /** * This method gets the index information for the given mapId and reduce. * It reads the index file into cache if it is not already present. * @param mapId @@ -74,7 +113,9 @@ class IndexCache { } } } - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + if (LOG.isDebugEnabled()) { + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + } } if (info.mapSpillRecord.size() == 0 || @@ -108,10 +149,14 @@ class IndexCache { } } } - LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + if (LOG.isDebugEnabled()) { + LOG.debug("IndexCache HIT: MapId " + mapId + " found"); + } return info; } - LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; + if (LOG.isDebugEnabled()) { + LOG.debug("IndexCache MISS: MapId " + mapId + " not found"); + } TezSpillRecord tmp = null; try { tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index fdaba86..80b9d46 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.SecureIOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto; import org.apache.hadoop.mapreduce.JobID; import org.apache.tez.mapreduce.hadoop.MRConfig; @@ -94,6 +95,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -306,20 +308,20 @@ public class ShuffleHandler extends AuxiliaryService { private List<String> mapIds; private AtomicInteger mapsToWait; private AtomicInteger mapsToSend; - private int reduceId; + private Range reduceRange; private ChannelHandlerContext ctx; private String user; private Map<String, Shuffle.MapOutputInfo> infoMap; private String jobId; private String dagId; - public ReduceContext(List<String> mapIds, int rId, + public ReduceContext(List<String> mapIds, Range reduceRange, ChannelHandlerContext context, String usr, Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap, String jobId, String dagId) { this.mapIds = mapIds; - this.reduceId = rId; + this.reduceRange = reduceRange; this.dagId = dagId; /** * Atomic count for tracking the no. of map outputs that are yet to @@ -340,8 +342,8 @@ public class ShuffleHandler extends AuxiliaryService { this.jobId = jobId; } - public int getReduceId() { - return reduceId; + public Range getReduceRange() { + return reduceRange; } public ChannelHandlerContext getCtx() { @@ -798,6 +800,29 @@ public class ShuffleHandler extends AuxiliaryService { } + protected static class Range { + final int first; + final int last; + + Range(int first, int last) { + this.first = first; + this.last = last; + } + + int getFirst() { + return first; + } + + int getLast() { + return last; + } + + @Override + public String toString() { + return new String("range: " + first + "-" + last); + } + } + class Shuffle extends SimpleChannelUpstreamHandler { private static final int MAX_WEIGHT = 10 * 1024 * 1024; @@ -865,13 +890,26 @@ public class ShuffleHandler extends AuxiliaryService { if (null == mapq) { return null; } - final List<String> ret = new ArrayList<String>(); + final List<String> ret = new ArrayList<>(); for (String s : mapq) { Collections.addAll(ret, s.split(",")); } return ret; } + private Range splitReduces(List<String> reduceq) { + if (null == reduceq || reduceq.size() != 1) { + return null; + } + String[] reduce = reduceq.get(0).split("-"); + int first = Integer.parseInt(reduce[0]); + int last = first; + if (reduce.length > 1) { + last = Integer.parseInt(reduce[1]); + } + return new Range(first, last); + } + @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) throws Exception { @@ -915,13 +953,13 @@ public class ShuffleHandler extends AuxiliaryService { } } final List<String> mapIds = splitMaps(q.get("map")); - final List<String> reduceQ = q.get("reduce"); + final Range reduceRange = splitReduces(q.get("reduce")); final List<String> jobQ = q.get("job"); final List<String> dagIdQ = q.get("dag"); if (LOG.isDebugEnabled()) { LOG.debug("RECV: " + request.getUri() + "\n mapId: " + mapIds + - "\n reduceId: " + reduceQ + + "\n reduceId: " + reduceRange + "\n jobId: " + jobQ + "\n dagId: " + dagIdQ + "\n keepAlive: " + keepAliveParam); @@ -930,11 +968,11 @@ public class ShuffleHandler extends AuxiliaryService { if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ)) { return; } - if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) { + if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) { sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); return; } - if (reduceQ.size() != 1 || jobQ.size() != 1) { + if (jobQ.size() != 1) { sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST); return; } @@ -944,13 +982,11 @@ public class ShuffleHandler extends AuxiliaryService { // on log4j.properties by uncommenting the setting if (AUDITLOG.isDebugEnabled()) { AUDITLOG.debug("shuffle for " + jobQ.get(0) + - " reducer " + reduceQ.get(0)); + " reducer " + reduceRange); } - int reduceId; String jobId; String dagId; try { - reduceId = Integer.parseInt(reduceQ.get(0)); jobId = jobQ.get(0); dagId = dagIdQ.get(0); } catch (NumberFormatException e) { @@ -982,7 +1018,7 @@ public class ShuffleHandler extends AuxiliaryService { String user = userRsrc.get(jobId); try { - populateHeaders(mapIds, jobId, dagId, user, reduceId, request, + populateHeaders(mapIds, jobId, dagId, user, reduceRange, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { ch.write(response); @@ -993,7 +1029,7 @@ public class ShuffleHandler extends AuxiliaryService { } ch.write(response); //Initialize one ReduceContext object per messageReceived call - ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, + ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx, user, mapOutputInfoMap, jobId, dagId); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); @@ -1050,14 +1086,14 @@ public class ShuffleHandler extends AuxiliaryService { MapOutputInfo info = reduceContext.getInfoMap().get(mapId); if (info == null) { info = getMapOutputInfo(reduceContext.dagId, mapId, - reduceContext.getReduceId(), reduceContext.getJobId(), + reduceContext.getJobId(), reduceContext.getUser()); } nextMap = sendMapOutput( reduceContext.getCtx(), reduceContext.getCtx().getChannel(), reduceContext.getUser(), mapId, - reduceContext.getReduceId(), info); + reduceContext.getReduceRange(), info); if (null == nextMap) { sendError(reduceContext.getCtx(), NOT_FOUND); return null; @@ -1103,7 +1139,7 @@ public class ShuffleHandler extends AuxiliaryService { } protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - int reduce, String jobId, + String jobId, String user) throws IOException { AttemptPathInfo pathInfo; try { @@ -1123,8 +1159,8 @@ public class ShuffleHandler extends AuxiliaryService { } } - TezIndexRecord info = - indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user); + TezSpillRecord spillRecord = + indexCache.getSpillRecord(mapId, pathInfo.indexPath, user); if (LOG.isDebugEnabled()) { LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId + @@ -1132,13 +1168,14 @@ public class ShuffleHandler extends AuxiliaryService { pathInfo.indexPath); } - MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info); + + MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, spillRecord); return outputInfo; } protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, - int reduce, HttpRequest request, + Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) @@ -1146,20 +1183,21 @@ public class ShuffleHandler extends AuxiliaryService { long contentLength = 0; for (String mapId : mapIds) { - MapOutputInfo outputInfo = - getMapOutputInfo(dagId, mapId, reduce, jobId, user); + MapOutputInfo outputInfo = getMapOutputInfo(dagId, mapId, jobId, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } - - ShuffleHeader header = - new ShuffleHeader(mapId, outputInfo.indexRecord.getPartLength(), - outputInfo.indexRecord.getRawLength(), reduce); DataOutputBuffer dob = new DataOutputBuffer(); - header.write(dob); - - contentLength += outputInfo.indexRecord.getPartLength(); - contentLength += dob.getLength(); + for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { + TezIndexRecord indexRecord = outputInfo.spillRecord.getIndex(reduce); + ShuffleHeader header = + new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce); + dob.reset(); + header.write(dob); + + contentLength += dob.getLength(); + contentLength += indexRecord.getPartLength(); + } } // Now set the response headers. @@ -1185,11 +1223,11 @@ public class ShuffleHandler extends AuxiliaryService { class MapOutputInfo { final Path mapOutputFileName; - final TezIndexRecord indexRecord; + final TezSpillRecord spillRecord; - MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord) { + MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord) { this.mapOutputFileName = mapOutputFileName; - this.indexRecord = indexRecord; + this.spillRecord = spillRecord; } } @@ -1235,28 +1273,46 @@ public class ShuffleHandler extends AuxiliaryService { } protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, - String user, String mapId, int reduce, MapOutputInfo mapOutputInfo) + String user, String mapId, Range reduceRange, MapOutputInfo outputInfo) throws IOException { - final TezIndexRecord info = mapOutputInfo.indexRecord; - final ShuffleHeader header = - new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce); - final DataOutputBuffer dob = new DataOutputBuffer(); - header.write(dob); - ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - final File spillfile = - new File(mapOutputInfo.mapOutputFileName.toString()); + TezIndexRecord firstIndex = null; + TezIndexRecord lastIndex = null; + + DataOutputBuffer dobRange = new DataOutputBuffer(); + // Indicate how many record to be written + WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1); + ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength())); + for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) { + TezIndexRecord index = outputInfo.spillRecord.getIndex(reduce); + // Records are only valid if they have a non-zero part length + if (index.getPartLength() != 0) { + if (firstIndex == null) { + firstIndex = index; + } + lastIndex = index; + } + + ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + + final long rangeOffset = firstIndex.getStartOffset(); + final long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset(); + final File spillFile = new File(outputInfo.mapOutputFileName.toString()); RandomAccessFile spill; try { - spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); + spill = SecureIOUtils.openForRandomRead(spillFile, "r", user, null); } catch (FileNotFoundException e) { - LOG.info(spillfile + " not found"); + LOG.info(spillFile + " not found"); return null; } ChannelFuture writeFuture; if (ch.getPipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - info.getStartOffset(), info.getPartLength(), manageOsCache, readaheadLength, - readaheadPool, spillfile.getAbsolutePath(), + rangeOffset, rangePartLength, manageOsCache, readaheadLength, + readaheadPool, spillFile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); writeFuture = ch.write(partition); writeFuture.addListener(new ChannelFutureListener() { @@ -1273,13 +1329,13 @@ public class ShuffleHandler extends AuxiliaryService { } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - info.getStartOffset(), info.getPartLength(), sslFileBufferSize, + rangeOffset, rangePartLength, sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, - spillfile.getAbsolutePath()); + spillFile.getAbsolutePath()); writeFuture = ch.write(chunk); } metrics.shuffleConnections.incr(); - metrics.shuffleOutputBytes.incr(info.getPartLength()); // optimistic + metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic return writeFuture; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index ebd9c5d..a790b9a 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -62,8 +62,6 @@ import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.security.UserGroupInformation; @@ -110,7 +108,7 @@ public class TestShuffleHandler { } @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - int reduce, String jobId, + String jobId, String user) throws IOException { // Do nothing. @@ -118,17 +116,16 @@ public class TestShuffleHandler { } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String dagId, String user, int reduce, - HttpRequest request, + String dagId, String user, Range reduceRange, HttpResponse response, boolean keepAliveParam, - Map<String, MapOutputInfo> infoMap) throws IOException { + Map<String, MapOutputInfo> infoMap) throws IOException { // Do nothing. } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String mapId, int reduce, - MapOutputInfo info) throws IOException { + Channel ch, String user, String mapId, Range reduceRange, + MapOutputInfo info) throws IOException { ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); @@ -236,18 +233,17 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - int reduce, String jobId, + String jobId, String user) throws IOException { return null; } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String dagId, String user, int reduce, - HttpRequest request, + String dagId, String user, Range reduceRange, HttpResponse response, boolean keepAliveParam, - Map<String, MapOutputInfo> infoMap) throws IOException { + Map<String, MapOutputInfo> infoMap) throws IOException { // Only set response headers and skip everything else // send some dummy value for content-length super.setResponseHeaders(response, keepAliveParam, 100); @@ -259,8 +255,8 @@ public class TestShuffleHandler { } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String mapId, int reduce, - MapOutputInfo info) + Channel ch, String user, String mapId, Range reduceRange, + MapOutputInfo info) throws IOException { // send a shuffle header and a lot of data down the channel // to trigger a broken pipe @@ -335,7 +331,6 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - int reduce, String jobId, String user) throws IOException { return null; @@ -349,7 +344,7 @@ public class TestShuffleHandler { @Override protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, - int reduce, HttpRequest request, + Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) @@ -376,8 +371,8 @@ public class TestShuffleHandler { @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String mapId, int reduce, - MapOutputInfo info) throws IOException { + Channel ch, String user, String mapId, Range reduceRange, + MapOutputInfo info) throws IOException { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); // send a shuffle header and a lot of data down the channel @@ -549,7 +544,7 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, - int reduce, String jobId, + String jobId, String user) throws IOException { // Do nothing. @@ -557,11 +552,10 @@ public class TestShuffleHandler { } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String dagId, String user, int reduce, - HttpRequest request, + String dagId, String user, Range reduceRange, HttpResponse response, boolean keepAliveParam, - Map<String, MapOutputInfo> infoMap) throws IOException { + Map<String, MapOutputInfo> infoMap) throws IOException { // Do nothing. } @Override @@ -572,8 +566,8 @@ public class TestShuffleHandler { } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String mapId, int reduce, - MapOutputInfo info) + Channel ch, String user, String mapId, Range reduceRange, + MapOutputInfo info) throws IOException { // send a shuffle header and a lot of data down the channel // to trigger a broken pipe @@ -985,9 +979,9 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected void populateHeaders(List<String> mapIds, - String outputBaseStr, String dagId, String user, int reduce, - HttpRequest request, HttpResponse response, - boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) + String outputBaseStr, String dagId, String user, Range reduceRange, + HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) throws IOException { // Only set response headers and skip everything else // send some dummy value for content-length @@ -1009,8 +1003,8 @@ public class TestShuffleHandler { } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String mapId, int reduce, - MapOutputInfo info) throws IOException { + Channel ch, String user, String mapId, Range reduceRange, + MapOutputInfo info) throws IOException { // send a shuffle header ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java new file mode 100644 index 0000000..30295bd --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/CompositeInputAttemptIdentifier.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +/** + * Container for a task number and an attempt number for the task. + */ +@Private +public class CompositeInputAttemptIdentifier extends InputAttemptIdentifier { + private final int inputIdentifierCount; + + public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, int inputIdentifierCount) { + this(inputIdentifier, attemptNumber, pathComponent, false, SPILL_INFO.FINAL_MERGE_ENABLED, -1, inputIdentifierCount); + } + + public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, boolean isShared, int inputIdentifierCount) { + this(inputIdentifier, attemptNumber, pathComponent, isShared, SPILL_INFO.FINAL_MERGE_ENABLED, -1, inputIdentifierCount); + } + + public CompositeInputAttemptIdentifier(int inputIdentifier, int attemptNumber, String pathComponent, + boolean shared, SPILL_INFO fetchTypeInfo, int spillEventId, int inputIdentifierCount) { + super(inputIdentifier, attemptNumber, pathComponent, shared, fetchTypeInfo, spillEventId); + this.inputIdentifierCount = inputIdentifierCount; + } + + + public int getInputIdentifierCount() { + return inputIdentifierCount; + } + + public InputAttemptIdentifier expand(int inputIdentifierOffset) { + return new InputAttemptIdentifier(getInputIdentifier() + inputIdentifierOffset, getAttemptNumber(), getPathComponent(), isShared(), getFetchTypeInfo(), getSpillEventId()); + } + + // PathComponent & shared does not need to be part of the hashCode and equals computation. + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override + public String toString() { + return super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java index d9595f0..9a5890d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java @@ -42,19 +42,21 @@ public class FetchResult { private final String host; private final int port; private final int partition; + private final int partitionCount; private final Iterable<InputAttemptIdentifier> pendingInputs; private final String additionalInfo; - public FetchResult(String host, int port, int partition, + public FetchResult(String host, int port, int partition, int partitionCount, Iterable<InputAttemptIdentifier> pendingInputs) { - this(host, port, partition, pendingInputs, null); + this(host, port, partition, partitionCount, pendingInputs, null); } - public FetchResult(String host, int port, int partition, + public FetchResult(String host, int port, int partition, int partitionCount, Iterable<InputAttemptIdentifier> pendingInputs, String additionalInfo) { this.host = host; this.port = port; this.partition = partition; + this.partitionCount = partitionCount; this.pendingInputs = pendingInputs; this.additionalInfo = additionalInfo; } @@ -71,6 +73,10 @@ public class FetchResult { return partition; } + public int getPartitionCount() { + return partitionCount; + } + public Iterable<InputAttemptIdentifier> getPendingInputs() { return pendingInputs; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 896f532..7b1abab 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -27,6 +27,7 @@ import java.net.URL; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -40,8 +41,10 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.WritableUtils; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang.StringUtils; @@ -72,6 +75,50 @@ import com.google.common.base.Preconditions; */ public class Fetcher extends CallableWithNdc<FetchResult> { + public static class PathPartition { + + final String path; + final int partition; + + PathPartition(String path, int partition) { + this.path = path; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + partition; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PathPartition other = (PathPartition) obj; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (partition != other.partition) + return false; + return true; + } + + @Override + public String toString() { + return "PathPartition [path=" + path + ", partition=" + partition + "]"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class); private static final AtomicInteger fetcherIdGen = new AtomicInteger(0); @@ -117,9 +164,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private int port; private int partition; + private int partitionCount; // Maps from the pathComponents (unique per srcTaskId) to the specific taskId - private final Map<String, InputAttemptIdentifier> pathToAttemptMap; + private final Map<PathPartition, InputAttemptIdentifier> pathToAttemptMap; private URL url; private volatile DataInputStream input; @@ -138,6 +186,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private long retryStartTime = 0; private final boolean asyncHttp; + private final boolean compositeFetch; private final boolean verifyDiskChecksum; @@ -152,7 +201,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, - int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) { + int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { this.asyncHttp = asyncHttp; this.verifyDiskChecksum = verifyDiskChecksum; this.fetcherCallback = fetcherCallback; @@ -160,7 +209,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { this.jobTokenSecretMgr = jobTokenSecretManager; this.appId = appId; this.dagIdentifier = dagIdentifier; - this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>(); + this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>(); this.httpConnectionParams = params; this.conf = conf; @@ -175,6 +224,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { this.lockPath = lockPath; this.localHostname = localHostname; this.shufflePort = shufflePort; + this.compositeFetch = compositeFetch; try { if (this.sharedFetchEnabled) { @@ -200,12 +250,20 @@ public class Fetcher extends CallableWithNdc<FetchResult> { boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled); if (srcAttempts.size() == 0) { - return new FetchResult(host, port, partition, srcAttempts); + return new FetchResult(host, port, partition, partitionCount, srcAttempts); } populateRemainingMap(srcAttempts); for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) { - pathToAttemptMap.put(in.getPathComponent(), in); + if (in instanceof CompositeInputAttemptIdentifier) { + CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in; + for (int i = 0; i < cin.getInputIdentifierCount(); i++) { + pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), i), cin.expand(i)); + } + } else { + pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in); + } + // do only if all of them are shared fetches multiplex &= in.isShared(); } @@ -390,7 +448,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { lock = getLock(); if (lock == null) { // re-queue until we get a lock - return new HostFetchResult(new FetchResult(host, port, partition, + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false); } else { if (findInputs() == srcAttemptsRemaining.size()) { @@ -416,7 +474,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (isShutDown.get()) { // if any exception was due to shut-down don't bother firing any more // requests - return new HostFetchResult(new FetchResult(host, port, partition, + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false); } // no more caching @@ -431,7 +489,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) { try { StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host, - port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle()); + port, partition, partitionCount, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle()); this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive()); @@ -456,7 +514,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { failedFetches = srcAttemptsRemaining.values(). toArray(new InputAttemptIdentifier[srcAttemptsRemaining.values().size()]); } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedFetches, true); + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, true); } if (isShutDown.get()) { // shutdown would have no effect if in the process of establishing the connection. @@ -464,7 +522,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (isDebugEnabled) { LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning"); } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false); } try { @@ -486,7 +544,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { InputAttemptIdentifier firstAttempt = attempts.iterator().next(); LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", e); - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), new InputAttemptIdentifier[] { firstAttempt }, false); } } catch (InterruptedException e) { @@ -514,7 +572,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { if (isDebugEnabled) { LOG.debug("Detected fetcher has been shutdown after opening stream. Returning"); } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, false); + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false); } // After this point, closing the stream and connection, should cause a // SocketException, @@ -532,7 +590,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + srcAttemptsRemaining.size() + " inputs"); } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false); } try { @@ -545,7 +603,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + srcAttemptsRemaining.size() + " inputs"); } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), null, + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), null, false); } // Connect again. @@ -563,7 +621,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } failedInputs = null; } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), failedInputs, + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedInputs, false); } @@ -650,7 +708,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } else { // nothing needs to be done to requeue remaining entries } - return new HostFetchResult(new FetchResult(host, port, partition, srcAttemptsRemaining.values()), + return new HostFetchResult(new FetchResult(host, port, partition, partitionCount, srcAttemptsRemaining.values()), failedFetches, false); } @@ -732,125 +790,166 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } } + private static class MapOutputStat { + final InputAttemptIdentifier srcAttemptId; + final long decompressedLength; + final long compressedLength; + final int forReduce; + + MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) { + this.srcAttemptId = srcAttemptId; + this.decompressedLength = decompressedLength; + this.compressedLength = compressedLength; + this.forReduce = forReduce; + } + + @Override + public String toString() { + return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce); + } + } private InputAttemptIdentifier[] fetchInputs(DataInputStream input, CachingCallBack callback) throws FetcherReadTimeoutException { FetchedInput fetchedInput = null; InputAttemptIdentifier srcAttemptId = null; - long decompressedLength = -1; - long compressedLength = -1; - + long decompressedLength = 0; + long compressedLength = 0; try { long startTime = System.currentTimeMillis(); - int responsePartition = -1; - // Read the shuffle header - String pathComponent = null; - try { - ShuffleHeader header = new ShuffleHeader(); - header.readFields(input); - pathComponent = header.getMapId(); - - srcAttemptId = pathToAttemptMap.get(pathComponent); - compressedLength = header.getCompressedLength(); - decompressedLength = header.getUncompressedLength(); - responsePartition = header.getPartition(); - } catch (IllegalArgumentException e) { - // badIdErrs.increment(1); - if (!isShutDown.get()) { - LOG.warn("Invalid src id ", e); - // Don't know which one was bad, so consider all of them as bad - return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); - } else { - if (isDebugEnabled) { - LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage()); - } - return null; - } + int partitionCount = 1; + + if (this.compositeFetch) { + // Multiple partitions are fetched + partitionCount = WritableUtils.readVInt(input); } + ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount); + for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) { + MapOutputStat mapOutputStat = null; + int responsePartition = -1; + // Read the shuffle header + String pathComponent = null; + try { + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + pathComponent = header.getMapId(); + srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition())); + + if (header.getCompressedLength() == 0) { + // Empty partitions are already accounted for + continue; + } - // Do some basic sanity verification - if (!verifySanity(compressedLength, decompressedLength, - responsePartition, srcAttemptId, pathComponent)) { - if (!isShutDown.get()) { - if (srcAttemptId == null) { - LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null"); - srcAttemptId = getNextRemainingAttempt(); + mapOutputStat = new MapOutputStat(srcAttemptId, + header.getUncompressedLength(), + header.getCompressedLength(), + header.getPartition()); + mapOutputStats.add(mapOutputStat); + responsePartition = header.getPartition(); + } catch (IllegalArgumentException e) { + // badIdErrs.increment(1); + if (!isShutDown.get()) { + LOG.warn("Invalid src id ", e); + // Don't know which one was bad, so consider all of them as bad + return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); + } else { + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage()); + } + return null; } - assert (srcAttemptId != null); - return new InputAttemptIdentifier[]{srcAttemptId}; - } else { - if (isDebugEnabled) { - LOG.debug("Already shutdown. Ignoring verification failure."); + } + + // Do some basic sanity verification + if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, + responsePartition, mapOutputStat.srcAttemptId, pathComponent)) { + if (!isShutDown.get()) { + srcAttemptId = mapOutputStat.srcAttemptId; + if (srcAttemptId == null) { + LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null"); + srcAttemptId = getNextRemainingAttempt(); + } + assert (srcAttemptId != null); + return new InputAttemptIdentifier[]{srcAttemptId}; + } else { + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring verification failure."); + } + return null; } - return null; } - } - if (isDebugEnabled) { - LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength - + ", decomp len: " + decompressedLength); - } - - // TODO TEZ-957. handle IOException here when Broadcast has better error checking - if (srcAttemptId.isShared() && callback != null) { - // force disk if input is being shared - fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength, - compressedLength, srcAttemptId); - } else { - fetchedInput = inputManager.allocate(decompressedLength, - compressedLength, srcAttemptId); - } - // No concept of WAIT at the moment. - // // Check if we can shuffle *now* ... - // if (fetchedInput.getType() == FetchedInput.WAIT) { - // LOG.info("fetcher#" + id + - // " - MergerManager returned Status.WAIT ..."); - // //Not an error but wait to process data. - // return EMPTY_ATTEMPT_ID_ARRAY; - // } - - // Go! - if (isDebugEnabled) { - LOG.debug("fetcher" + " about to shuffle output of srcAttempt " - + fetchedInput.getInputAttemptIdentifier() + " decomp: " - + decompressedLength + " len: " + compressedLength + " to " - + fetchedInput.getType()); + if (isDebugEnabled) { + LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength + + ", decomp len: " + mapOutputStat.decompressedLength); + } } - if (fetchedInput.getType() == Type.MEMORY) { - ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), - input, (int) decompressedLength, (int) compressedLength, codec, - ifileReadAhead, ifileReadAheadLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); - } else if (fetchedInput.getType() == Type.DISK) { - ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), - (host +":" +port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString(), - ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); - } else { - throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + - fetchedInput); - } + for (MapOutputStat mapOutputStat : mapOutputStats) { + // Get the location for the map output - either in-memory or on-disk + srcAttemptId = mapOutputStat.srcAttemptId; + decompressedLength = mapOutputStat.decompressedLength; + compressedLength = mapOutputStat.compressedLength; + // TODO TEZ-957. handle IOException here when Broadcast has better error checking + if (srcAttemptId.isShared() && callback != null) { + // force disk if input is being shared + fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength, + compressedLength, srcAttemptId); + } else { + fetchedInput = inputManager.allocate(decompressedLength, + compressedLength, srcAttemptId); + } + // No concept of WAIT at the moment. + // // Check if we can shuffle *now* ... + // if (fetchedInput.getType() == FetchedInput.WAIT) { + // LOG.info("fetcher#" + id + + // " - MergerManager returned Status.WAIT ..."); + // //Not an error but wait to process data. + // return EMPTY_ATTEMPT_ID_ARRAY; + // } + + // Go! + if (isDebugEnabled) { + LOG.debug("fetcher" + " about to shuffle output of srcAttempt " + + fetchedInput.getInputAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + + fetchedInput.getType()); + } - // offer the fetched input for caching - if (srcAttemptId.isShared() && callback != null) { - // this has to be before the fetchSucceeded, because that goes across - // threads into the reader thread and can potentially shutdown this thread - // while it is still caching. - callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength); - } + if (fetchedInput.getType() == Type.MEMORY) { + ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), + input, (int) decompressedLength, (int) compressedLength, codec, + ifileReadAhead, ifileReadAheadLength, LOG, + fetchedInput.getInputAttemptIdentifier().toString()); + } else if (fetchedInput.getType() == Type.DISK) { + ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), + (host + ":" + port), input, compressedLength, decompressedLength, LOG, + fetchedInput.getInputAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); + } else { + throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + + fetchedInput); + } - // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); - // Reset retryStartTime as map task make progress if retried before. - retryStartTime = 0; - fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, - compressedLength, decompressedLength, (endTime - startTime)); + // offer the fetched input for caching + if (srcAttemptId.isShared() && callback != null) { + // this has to be before the fetchSucceeded, because that goes across + // threads into the reader thread and can potentially shutdown this thread + // while it is still caching. + callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength); + } + + // Inform the shuffle scheduler + long endTime = System.currentTimeMillis(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, + compressedLength, decompressedLength, (endTime - startTime)); - // Note successful shuffle - srcAttemptsRemaining.remove(srcAttemptId.toString()); + // Note successful shuffle + srcAttemptsRemaining.remove(srcAttemptId.toString()); - // metrics.successFetch(); - return null; + // metrics.successFetch(); + } } catch (IOException ioe) { if (isShutDown.get()) { cleanupFetchedInput(fetchedInput); @@ -887,6 +986,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // metrics.failedFetch(); return new InputAttemptIdentifier[] { srcAttemptId }; } + return null; } private void cleanupFetchedInput(FetchedInput fetchedInput) { @@ -951,7 +1051,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return false; } - if (fetchPartition != this.partition) { + if (fetchPartition < this.partition || fetchPartition >= this.partition + this.partitionCount) { // wrongReduceErrs.increment(1); LOG.warn(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " @@ -960,16 +1060,6 @@ public class Fetcher extends CallableWithNdc<FetchResult> { + " for reduce " + fetchPartition); return false; } - - // Sanity check - // we are guaranteed that key is not null - if (srcAttemptsRemaining.get(srcAttemptId.toString()) == null) { - // wrongMapErrs.increment(1); - LOG.warn("Invalid input. Received output for headerPathComponent: " - + pathComponent + "nextRemainingSrcAttemptId: " - + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId); - return false; - } return true; } @@ -992,10 +1082,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> { HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, - boolean asyncHttp, boolean verifyDiskChecksum) { + boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, - false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum); + false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); } public FetcherBuilder(FetcherCallback fetcherCallback, @@ -1004,11 +1094,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> { Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, - String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) { + String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) { this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, - verifyDiskChecksum); + verifyDiskChecksum, compositeFetch); } public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) { @@ -1027,11 +1117,12 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return this; } - public FetcherBuilder assignWork(String host, int port, int partition, + public FetcherBuilder assignWork(String host, int port, int partition, int partitionCount, List<InputAttemptIdentifier> inputs) { fetcher.host = host; fetcher.port = port; fetcher.partition = partition; + fetcher.partitionCount = partitionCount; fetcher.srcAttempts = inputs; workAssigned = true; return this; http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java index 969e06c..88dacb9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java @@ -34,11 +34,48 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier; */ public class InputHost extends HostPort { + private static class PartitionRange { + + private final int partition; + private final int partitionCount; + + PartitionRange(int partition, int partitionCount) { + this.partition = partition; + this.partitionCount = partitionCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PartitionRange that = (PartitionRange) o; + + if (partition != that.partition) return false; + return partitionCount == that.partitionCount; + } + + @Override + public int hashCode() { + int result = partition; + result = 31 * result + partitionCount; + return result; + } + + public int getPartition() { + return partition; + } + + public int getPartitionCount() { + return partitionCount; + } + } + private String additionalInfo; // Each input host can support more than one partition. // Each partition has a list of inputs for pipelined shuffle. - private final Map<Integer, BlockingQueue<InputAttemptIdentifier>> + private final Map<PartitionRange, BlockingQueue<InputAttemptIdentifier>> partitionToInputs = new ConcurrentHashMap<>(); public InputHost(HostPort hostPort) { @@ -57,24 +94,25 @@ public class InputHost extends HostPort { return partitionToInputs.size(); } - public synchronized void addKnownInput(Integer partition, + public synchronized void addKnownInput(int partition, int partitionCount, InputAttemptIdentifier srcAttempt) { + PartitionRange partitionRange = new PartitionRange(partition, partitionCount); BlockingQueue<InputAttemptIdentifier> inputs = - partitionToInputs.get(partition); + partitionToInputs.get(partitionRange); if (inputs == null) { inputs = new LinkedBlockingQueue<InputAttemptIdentifier>(); - partitionToInputs.put(partition, inputs); + partitionToInputs.put(partitionRange, inputs); } inputs.add(srcAttempt); } public synchronized PartitionToInputs clearAndGetOnePartition() { - for (Map.Entry<Integer, BlockingQueue<InputAttemptIdentifier>> entry : + for (Map.Entry<PartitionRange, BlockingQueue<InputAttemptIdentifier>> entry : partitionToInputs.entrySet()) { List<InputAttemptIdentifier> inputs = new ArrayList<InputAttemptIdentifier>(entry.getValue().size()); entry.getValue().drainTo(inputs); - PartitionToInputs ret = new PartitionToInputs(entry.getKey(), inputs); + PartitionToInputs ret = new PartitionToInputs(entry.getKey().getPartition(), entry.getKey().getPartitionCount(), inputs); partitionToInputs.remove(entry.getKey()); return ret; } @@ -103,12 +141,13 @@ public class InputHost extends HostPort { } public static class PartitionToInputs { - private int partition; + private final int partition; + private final int partitionCount; private List<InputAttemptIdentifier> inputs; - public PartitionToInputs(int partition, - List<InputAttemptIdentifier> input) { + public PartitionToInputs(int partition, int partitionCount, List<InputAttemptIdentifier> input) { this.partition = partition; + this.partitionCount = partitionCount; this.inputs = input; } @@ -116,13 +155,17 @@ public class InputHost extends HostPort { return partition; } + public int getPartitionCount() { + return partitionCount; + } + public List<InputAttemptIdentifier> getInputs() { return inputs; } @Override public String toString() { - return "partition=" + partition + ", inputs=" + inputs; + return "partition=" + partition + ", partitionCount=" + partitionCount + ", inputs=" + inputs; } } } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 6fa43e8..1d644aa 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -193,7 +193,7 @@ public class ShuffleUtils { } public static StringBuilder constructBaseURIForShuffleHandler(String host, - int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) { + int port, int partition, int partitionCount, String appId, int dagIdentifier, boolean sslShuffle) { final String http_protocol = (sslShuffle) ? "https://" : "http://"; StringBuilder sb = new StringBuilder(http_protocol); sb.append(host); @@ -206,6 +206,10 @@ public class ShuffleUtils { sb.append(String.valueOf(dagIdentifier)); sb.append("&reduce="); sb.append(String.valueOf(partition)); + if (partitionCount > 1) { + sb.append("-"); + sb.append(String.valueOf(partition + partitionCount - 1)); + } sb.append("&map="); return sb; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java index c1893fc..a80d21b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandlerImpl.java @@ -28,6 +28,7 @@ import java.util.zip.Inflater; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.io.compress.CompressionCodec; @@ -61,6 +62,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { private final int ifileReadAheadLength; private final boolean useSharedInputs; private final InputContext inputContext; + private final boolean compositeFetch; private final Inflater inflater; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); @@ -71,7 +73,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { public ShuffleInputEventHandlerImpl(InputContext inputContext, ShuffleManager shuffleManager, FetchedInputAllocator inputAllocator, CompressionCodec codec, - boolean ifileReadAhead, int ifileReadAheadLength) { + boolean ifileReadAhead, int ifileReadAheadLength, boolean compositeFetch) { this.inputContext = inputContext; this.shuffleManager = shuffleManager; this.inputAllocator = inputAllocator; @@ -81,6 +83,7 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { // this currently relies on a user to enable the flag // expand on idea based on vertex parallelism and num inputs this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0); + this.compositeFetch = compositeFetch; this.inflater = TezCommonUtils.newInflater(); } @@ -113,10 +116,10 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet); shuffleManager.updateEventReceivedTime(); } else if (event instanceof CompositeRoutedDataMovementEvent) { - CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event; + CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event; DataMovementEventPayloadProto shufflePayload; try { - shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload())); + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(crdme.getUserPayload())); } catch (InvalidProtocolBufferException e) { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } @@ -129,9 +132,14 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); } } - for (int offset = 0; offset < edme.getCount(); offset++) { - numDmeEvents.incrementAndGet(); - processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet); + if (compositeFetch) { + numDmeEvents.addAndGet(crdme.getCount()); + processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet); + } else { + for (int offset = 0; offset < crdme.getCount(); offset++) { + numDmeEvents.incrementAndGet(); + processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet); + } } shuffleManager.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { @@ -166,23 +174,59 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { if (shufflePayload.hasEmptyPartitions()) { if (emptyPartitionsBitSet.get(srcIndex)) { - InputAttemptIdentifier srcAttemptIdentifier = - constructInputAttemptIdentifier(dme, shufflePayload, false); + CompositeInputAttemptIdentifier srcAttemptIdentifier = + constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, false); if (LOG.isDebugEnabled()) { LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."); } numDmeEventsNoData.incrementAndGet(); - shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier); + shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0)); return; } } - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme, + CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, (useSharedInputs && srcIndex == 0)); - shuffleManager.addKnownInput(shufflePayload.getHost(), - shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); + shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex); + } + + private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdme, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException { + int partitionId = crdme.getSourceIndex(); + if (LOG.isDebugEnabled()) { + LOG.debug("DME srcIdx: " + partitionId + ", targetIndex: " + crdme.getTargetIndex() + ", count:" + crdme.getCount() + + ", attemptNum: " + crdme.getVersion() + ", payload: " + ShuffleUtils + .stringify(shufflePayload)); + } + + if (shufflePayload.hasEmptyPartitions()) { + CompositeInputAttemptIdentifier srcAttemptIdentifier = + constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(), shufflePayload, false); + + boolean allPartitionsEmpty = true; + for (int i = 0; i < crdme.getCount(); i++) { + int srcPartitionId = partitionId + i; + allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId); + if (emptyPartitionsBitSet.get(srcPartitionId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + + srcAttemptIdentifier + "]. Not fetching."); + } + numDmeEventsNoData.getAndIncrement(); + shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(i)); + } + } + + if (allPartitionsEmpty) { + return; + } + } + + CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(), + shufflePayload, (useSharedInputs && partitionId == 0)); + + shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, partitionId); } private void processInputFailedEvent(InputFailedEvent ife) { @@ -193,26 +237,27 @@ public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler { /** * Helper method to create InputAttemptIdentifier * - * @param dmEvent + * @param targetIndex + * @param targetIndexCount + * @param version * @param shufflePayload - * @return InputAttemptIdentifier + * @param isShared + * @return CompositeInputAttemptIdentifier */ - private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent, + private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version, DataMovementEventPayloadProto shufflePayload, boolean isShared) { String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null; - InputAttemptIdentifier srcAttemptIdentifier = null; + CompositeInputAttemptIdentifier srcAttemptIdentifier = null; if (shufflePayload.hasSpillId()) { int spillEventId = shufflePayload.getSpillId(); boolean lastEvent = shufflePayload.getLastEvent(); InputAttemptIdentifier.SPILL_INFO spillInfo = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent - .getVersion(), pathComponent, isShared, spillInfo, spillEventId); + new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, spillInfo, spillEventId, targetIndexCount); } else { srcAttemptIdentifier = - new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), - pathComponent, isShared); + new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, targetIndexCount); } return srcAttemptIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 91021a1..3964431 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -48,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.runtime.api.TaskFailureType; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -142,6 +143,7 @@ public class ShuffleManager implements FetcherCallback { private final boolean localDiskFetchEnabled; private final boolean sharedFetchEnabled; private final boolean verifyDiskChecksum; + private final boolean compositeFetch; private final int ifileBufferSize; private final boolean ifileReadAhead; @@ -211,6 +213,7 @@ public class ShuffleManager implements FetcherCallback { this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); + this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); this.srcNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()); @@ -419,7 +422,7 @@ public class ShuffleManager implements FetcherCallback { httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, - localhostName, shufflePort, asyncHttp, verifyDiskChecksum); + localhostName, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch); if (codec != null) { fetcherBuilder.setCompressionParameters(codec); @@ -456,7 +459,7 @@ public class ShuffleManager implements FetcherCallback { if (includedMaps >= maxTaskOutputAtOnce) { inputIter.remove(); //add to inputHost - inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), + inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), pendingInputsOfOnePartition.getPartitionCount(), input); } else { includedMaps++; @@ -467,6 +470,7 @@ public class ShuffleManager implements FetcherCallback { } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), pendingInputsOfOnePartition.getPartition(), + pendingInputsOfOnePartition.getPartitionCount(), pendingInputsOfOnePartition.getInputs()); if (LOG.isDebugEnabled()) { LOG.debug("Created Fetcher for host: " + inputHost.getHost() @@ -479,7 +483,7 @@ public class ShuffleManager implements FetcherCallback { /////////////////// Methods for InputEventHandler public void addKnownInput(String hostName, int port, - InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) { + CompositeInputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) { HostPort identifier = new HostPort(hostName, port); InputHost host = knownSrcHosts.get(identifier); if (host == null) { @@ -497,13 +501,14 @@ public class ShuffleManager implements FetcherCallback { if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { return; } - int inputIdentifier = srcAttemptIdentifier.getInputIdentifier(); - if (shuffleInfoEventsMap.get(inputIdentifier) == null) { - shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); + for (int i = 0; i < srcAttemptIdentifier.getInputIdentifierCount(); i++) { + if (shuffleInfoEventsMap.get(inputIdentifier + i) == null) { + shuffleInfoEventsMap.put(inputIdentifier + i, new ShuffleEventInfo(srcAttemptIdentifier.expand(i))); + } } - host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier); + host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier.getInputIdentifierCount(), srcAttemptIdentifier); lock.lock(); try { boolean added = pendingHosts.offer(host); @@ -1007,7 +1012,7 @@ public class ShuffleManager implements FetcherCallback { InputHost inputHost = knownSrcHosts.get(identifier); assert inputHost != null; for (InputAttemptIdentifier input : pendingInputs) { - inputHost.addKnownInput(result.getPartition(), input); + inputHost.addKnownInput(result.getPartition(), result.getPartitionCount(), input); } inputHost.setAdditionalInfo(result.getAdditionalInfo()); pendingHosts.add(inputHost);
