http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index b6599dc..18b824a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.io.WritableUtils; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; @@ -79,7 +81,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final int dagId; private final MapHost mapHost; - private final int currentPartition; + private final int minPartition; + private final int maxPartition; // Decompression of map-outputs private final CompressionCodec codec; @@ -99,6 +102,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { volatile BaseHttpConnection httpConnection; private final boolean asyncHttp; + private final boolean compositeFetch; // Initiative value is 0, which means it hasn't retried yet. @@ -127,13 +131,15 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { int dagId, boolean asyncHttp, boolean sslShuffle, - boolean verifyDiskChecksum) { + boolean verifyDiskChecksum, + boolean compositeFetch) { this.scheduler = scheduler; this.allocator = allocator; this.metrics = metrics; this.exceptionReporter = exceptionReporter; this.mapHost = mapHost; - this.currentPartition = this.mapHost.getPartitionId(); + this.minPartition = this.mapHost.getPartitionId(); + this.maxPartition = this.minPartition + this.mapHost.getPartitionCount() - 1; this.id = nextId.incrementAndGet(); this.jobTokenSecretManager = jobTokenSecretMgr; @@ -162,6 +168,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { this.localDiskFetchEnabled = localDiskFetchEnabled; this.sslShuffle = sslShuffle; this.verifyDiskChecksum = verifyDiskChecksum; + this.compositeFetch = compositeFetch; this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id; } @@ -252,7 +259,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } if(LOG.isDebugEnabled()) { LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " - + srcAttempts + ", partitionId: " + currentPartition); + + srcAttempts + ", partition range: " + minPartition + "-" + maxPartition); } populateRemainingMap(srcAttempts); // Construct the url and connect @@ -333,7 +340,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { boolean connectSucceeded = false; try { StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(), - host.getPort(), host.getPartitionId(), applicationId, dagId, sslShuffle); + host.getPort(), host.getPartitionId(), host.getPartitionCount(), applicationId, dagId, sslShuffle); URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive()); httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams, logIdentifier, jobTokenSecretManager); @@ -399,134 +406,172 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0]; + private static class MapOutputStat { + final InputAttemptIdentifier srcAttemptId; + final long decompressedLength; + final long compressedLength; + final int forReduce; + + MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) { + this.srcAttemptId = srcAttemptId; + this.decompressedLength = decompressedLength; + this.compressedLength = compressedLength; + this.forReduce = forReduce; + } + + @Override + public String toString() { + return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce); + } + } + protected InputAttemptIdentifier[] copyMapOutput(MapHost host, DataInputStream input) throws FetcherReadTimeoutException { MapOutput mapOutput = null; InputAttemptIdentifier srcAttemptId = null; - long decompressedLength = -1; - long compressedLength = -1; - + long decompressedLength = 0; + long compressedLength = 0; try { long startTime = System.currentTimeMillis(); - int forReduce = -1; - //Read the shuffle header - try { - ShuffleHeader header = new ShuffleHeader(); - // TODO Review: Multiple header reads in case of status WAIT ? - header.readFields(input); - if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) { + int partitionCount = 1; + + if (this.compositeFetch) { + // Multiple partitions are fetched + partitionCount = WritableUtils.readVInt(input); + } + ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount); + for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) { + MapOutputStat mapOutputStat = null; + try { + //Read the shuffle header + ShuffleHeader header = new ShuffleHeader(); + // TODO Review: Multiple header reads in case of status WAIT ? + header.readFields(input); + if (!header.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) { + if (!stopped) { + badIdErrs.increment(1); + LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " + + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce); + return new InputAttemptIdentifier[]{getNextRemainingAttempt()}; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Already shutdown. Ignoring invalid map id error"); + } + return EMPTY_ATTEMPT_ID_ARRAY; + } + } + + if (header.getCompressedLength() == 0) { + // Empty partitions are already accounted for + continue; + } + + mapOutputStat = new MapOutputStat(scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce), + header.uncompressedLength, + header.compressedLength, + header.forReduce); + mapOutputStats.add(mapOutputStat); + } catch (IllegalArgumentException e) { if (!stopped) { badIdErrs.increment(1); - LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " + - InputAttemptIdentifier.PATH_PREFIX + ", partition: " + header.forReduce); - return new InputAttemptIdentifier[] {getNextRemainingAttempt()}; + LOG.warn("Invalid map id ", e); + // Don't know which one was bad, so consider this one bad and dont read + // the remaining because we dont know where to start reading from. YARN-1773 + return new InputAttemptIdentifier[]{getNextRemainingAttempt()}; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Already shutdown. Ignoring invalid map id error"); + LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " + + e.getClass().getName() + ", Message: " + e.getMessage()); } return EMPTY_ATTEMPT_ID_ARRAY; } } - srcAttemptId = - scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce); - compressedLength = header.compressedLength; - decompressedLength = header.uncompressedLength; - forReduce = header.forReduce; - } catch (IllegalArgumentException e) { - if (!stopped) { - badIdErrs.increment(1); - LOG.warn("Invalid map id ", e); - // Don't know which one was bad, so consider this one bad and dont read - // the remaining because we dont know where to start reading from. YARN-1773 - return new InputAttemptIdentifier[] {getNextRemainingAttempt()}; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " + - e.getClass().getName() + ", Message: " + e.getMessage()); + + // Do some basic sanity verification + if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, mapOutputStat.forReduce, + remaining, mapOutputStat.srcAttemptId)) { + if (!stopped) { + srcAttemptId = mapOutputStat.srcAttemptId; + if (srcAttemptId == null) { + srcAttemptId = getNextRemainingAttempt(); + LOG.warn("Was expecting " + srcAttemptId + " but got null"); + } + assert (srcAttemptId != null); + return new InputAttemptIdentifier[]{srcAttemptId}; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Already stopped. Ignoring verification failure."); + } + return EMPTY_ATTEMPT_ID_ARRAY; } - return EMPTY_ATTEMPT_ID_ARRAY; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength + + ", decomp len: " + mapOutputStat.decompressedLength); } } - // Do some basic sanity verification - if (!verifySanity(compressedLength, decompressedLength, forReduce, - remaining, srcAttemptId)) { - if (!stopped) { - if (srcAttemptId == null) { - LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null"); - srcAttemptId = getNextRemainingAttempt(); - } - assert (srcAttemptId != null); - return new InputAttemptIdentifier[]{srcAttemptId}; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring verification failure."); + for (MapOutputStat mapOutputStat : mapOutputStats) { + // Get the location for the map output - either in-memory or on-disk + srcAttemptId = mapOutputStat.srcAttemptId; + decompressedLength = mapOutputStat.decompressedLength; + compressedLength = mapOutputStat.compressedLength; + try { + mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id); + } catch (IOException e) { + if (!stopped) { + // Kill the reduce attempt + ioErrs.increment(1); + scheduler.reportLocalError(e); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Already stopped. Ignoring error from merger.reserve"); + } } return EMPTY_ATTEMPT_ID_ARRAY; } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); - } - // Get the location for the map output - either in-memory or on-disk - try { - mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id); - } catch (IOException e) { - if (!stopped) { - // Kill the reduce attempt - ioErrs.increment(1); - scheduler.reportLocalError(e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring error from merger.reserve"); - } + // Check if we can shuffle *now* ... + if (mapOutput.getType() == Type.WAIT) { + LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Check if we can shuffle *now* ... - if (mapOutput.getType() == Type.WAIT) { - LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - //Not an error but wait to process data. - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Go! - if (LOG.isDebugEnabled()) { - LOG.debug("fetcher#" + id + " about to shuffle output of map " + - mapOutput.getAttemptIdentifier() + " decomp: " + - decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); - } - if (mapOutput.getType() == Type.MEMORY) { - ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, - (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, - ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); - } else if (mapOutput.getType() == Type.DISK) { - ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, decompressedLength, LOG, - mapOutput.getAttemptIdentifier().toString(), - ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); - } else { - throw new IOException("Unknown mapOutput type while fetching shuffle data:" + - mapOutput.getType()); - } + // Go! + if (LOG.isDebugEnabled()) { + LOG.debug("fetcher#" + id + " about to shuffle output of map " + + mapOutput.getAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); + } - // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); - // Reset retryStartTime as map task make progress if retried before. - retryStartTime = 0; + if (mapOutput.getType() == Type.MEMORY) { + ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, + (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, + ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); + } else if (mapOutput.getType() == Type.DISK) { + ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), + input, compressedLength, decompressedLength, LOG, + mapOutput.getAttemptIdentifier().toString(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); + } else { + throw new IOException("Unknown mapOutput type while fetching shuffle data:" + + mapOutput.getType()); + } - scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, - endTime - startTime, mapOutput, false); - // Note successful shuffle - remaining.remove(srcAttemptId.toString()); - metrics.successFetch(); - return null; - } catch (IOException ioe) { + // Inform the shuffle scheduler + long endTime = System.currentTimeMillis(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + + scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, + endTime - startTime, mapOutput, false); + // Note successful shuffle + remaining.remove(srcAttemptId.toString()); + metrics.successFetch(); + } + } catch(IOException ioe) { if (stopped) { if (LOG.isDebugEnabled()) { LOG.debug("Not reporting fetch failure for exception during data copy: [" @@ -548,23 +593,24 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } ioErrs.increment(1); if (srcAttemptId == null || mapOutput == null) { - LOG.info("fetcher#" + id + " failed to read map header" + - srcAttemptId + " decomp: " + - decompressedLength + ", " + compressedLength, ioe); - if(srcAttemptId == null) { + LOG.info("fetcher#" + id + " failed to read map header" + + srcAttemptId + " decomp: " + + decompressedLength + ", " + compressedLength, ioe); + if (srcAttemptId == null) { return remaining.values().toArray(new InputAttemptIdentifier[remaining.values().size()]); } else { - return new InputAttemptIdentifier[] {srcAttemptId}; + return new InputAttemptIdentifier[]{srcAttemptId}; } } - LOG.warn("Failed to shuffle output of " + srcAttemptId + - " from " + host.getHostIdentifier(), ioe); + LOG.warn("Failed to shuffle output of " + srcAttemptId + + " from " + host.getHostIdentifier(), ioe); // Inform the shuffle-scheduler mapOutput.abort(); metrics.failedFetch(); - return new InputAttemptIdentifier[] {srcAttemptId}; + return new InputAttemptIdentifier[]{srcAttemptId}; } + return null; } /** @@ -619,21 +665,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // partitionId verification. Isn't availalbe here because it is encoded into // URI - if (forReduce != currentPartition) { + if (forReduce < minPartition || forReduce > maxPartition) { wrongReduceErrs.increment(1); LOG.warn(logIdentifier + " data for the wrong partition map: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce - + ", expected partition: " + currentPartition); - return false; - } - - // Sanity check - if (remaining.get(srcAttemptId.toString()) == null) { - wrongMapErrs.increment(1); - LOG.warn("Invalid map-output! Received output for " + srcAttemptId); + + ", expected partition range: " + minPartition + "-" + maxPartition); return false; } - return true; } @@ -658,7 +696,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { if(LOG.isDebugEnabled()) { LOG.debug("Fetcher " + id + " going to fetch (local disk) from " + host + " for: " - + srcAttempts + ", partitionId: " + currentPartition); + + srcAttempts + ", partition range: " + minPartition + "-" + maxPartition); } // List of maps to be fetched yet @@ -678,7 +716,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), - currentPartition); + minPartition); mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); long endTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java index c2cfd06..4746306 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java @@ -86,19 +86,25 @@ class MapHost { private final String host; private final int port; private final int partition; + private final int partitionCount; // Tracks attempt IDs private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>(); - public MapHost(String host, int port, int partition) { + public MapHost(String host, int port, int partition, int partitionCount) { this.host = host; this.port = port; this.partition = partition; + this.partitionCount = partitionCount; } public int getPartitionId() { return partition; } + public int getPartitionCount() { + return partitionCount; + } + public State getState() { return state; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index e5f4e5c..b3d8a6f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -187,7 +187,8 @@ public class Shuffle implements ExceptionReporter { eventHandler= new ShuffleInputEventHandlerOrderedGrouped( inputContext, - scheduler); + scheduler, + conf); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index f39affe..d34fb5f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -26,10 +26,12 @@ import java.util.zip.Inflater; import com.google.protobuf.ByteString; import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezUncheckedException; @@ -49,6 +51,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; + private final boolean compositeFetch; private final Inflater inflater; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); @@ -57,9 +60,11 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0); public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, - ShuffleScheduler scheduler) { + ShuffleScheduler scheduler, + Configuration conf) { this.inputContext = inputContext; this.scheduler = scheduler; + this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); this.inflater = TezCommonUtils.newInflater(); } @@ -101,10 +106,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet); scheduler.updateEventReceivedTime(); } else if (event instanceof CompositeRoutedDataMovementEvent) { - CompositeRoutedDataMovementEvent edme = (CompositeRoutedDataMovementEvent)event; + CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event; DataMovementEventPayloadProto shufflePayload; try { - shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(edme.getUserPayload())); + shufflePayload = DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(crdme.getUserPayload())); } catch (InvalidProtocolBufferException e) { throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } @@ -117,9 +122,14 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl throw new TezUncheckedException("Unable to set the empty partition to succeeded", e); } } - for (int offset = 0; offset < edme.getCount(); offset++) { - numDmeEvents.incrementAndGet(); - processDataMovementEvent(edme.expand(offset), shufflePayload, emptyPartitionsBitSet); + if (compositeFetch) { + numDmeEvents.addAndGet(crdme.getCount()); + processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet); + } else { + for (int offset = 0; offset < crdme.getCount(); offset++) { + numDmeEvents.incrementAndGet(); + processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet); + } } scheduler.updateEventReceivedTime(); } else if (event instanceof InputFailedEvent) { @@ -135,7 +145,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private void processDataMovementEvent(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException { int partitionId = dmEvent.getSourceIndex(); - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); + CompositeInputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent.getTargetIndex(), 1, dmEvent.getVersion(), shufflePayload); if (LOG.isDebugEnabled()) { LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() @@ -152,7 +162,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl + srcAttemptIdentifier + "]. Not fetching."); } numDmeEventsNoData.incrementAndGet(); - scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true); + scheduler.copySucceeded(srcAttemptIdentifier.expand(0), null, 0, 0, 0, null, true); return; } } catch (IOException e) { @@ -164,6 +174,40 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl partitionId, srcAttemptIdentifier); } + private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdmEvent, DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException { + int partitionId = crdmEvent.getSourceIndex(); + CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = constructInputAttemptIdentifier(crdmEvent.getTargetIndex(), crdmEvent.getCount(), crdmEvent.getVersion(), shufflePayload); + + if (LOG.isDebugEnabled()) { + LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + crdmEvent.getTargetIndex() + ", count:" + crdmEvent.getCount() + + ", attemptNum: " + crdmEvent.getVersion() + ", payload: " + + ShuffleUtils.stringify(shufflePayload)); + } + + if (shufflePayload.hasEmptyPartitions()) { + boolean allPartitionsEmpty = true; + for (int i = 0; i < crdmEvent.getCount(); i++) { + int srcPartitionId = partitionId + i; + allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId); + if (emptyPartitionsBitSet.get(srcPartitionId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + + compositeInputAttemptIdentifier + "]. Not fetching."); + } + numDmeEventsNoData.addAndGet(crdmEvent.getCount()); + scheduler.copySucceeded(compositeInputAttemptIdentifier.expand(i), null, 0, 0, 0, null, true); + } + } + + if (allPartitionsEmpty) { + return; + } + } + + scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(), + partitionId, compositeInputAttemptIdentifier); + } + private void processTaskFailedEvent(InputFailedEvent ifEvent) { InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion()); scheduler.obsoleteInput(taIdentifier); @@ -175,25 +219,26 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl /** * Helper method to create InputAttemptIdentifier * - * @param dmEvent + * @param targetIndex + * @param targetIndexCount + * @param version * @param shufflePayload - * @return InputAttemptIdentifier + * @return CompositeInputAttemptIdentifier */ - private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent, - DataMovementEventPayloadProto shufflePayload) { + private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version, + DataMovementEventPayloadProto shufflePayload) { String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null; int spillEventId = shufflePayload.getSpillId(); - InputAttemptIdentifier srcAttemptIdentifier = null; + CompositeInputAttemptIdentifier srcAttemptIdentifier = null; if (shufflePayload.hasSpillId()) { boolean lastEvent = shufflePayload.getLastEvent(); InputAttemptIdentifier.SPILL_INFO info = (lastEvent) ? InputAttemptIdentifier.SPILL_INFO .FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE; srcAttemptIdentifier = - new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent - .getVersion(), pathComponent, false, info, spillEventId); + new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, false, info, spillEventId, targetIndexCount); } else { srcAttemptIdentifier = - new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), pathComponent); + new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, targetIndexCount); } return srcAttemptIdentifier; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 2f3c137..129e0cc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -60,6 +60,7 @@ import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -237,6 +238,7 @@ class ShuffleScheduler { private final float maxAllowedFailedFetchFraction; private final boolean checkFailedFetchSinceLastCompletion; private final boolean verifyDiskChecksum; + private final boolean compositeFetch; private volatile Thread shuffleSchedulerThread = null; @@ -407,6 +409,7 @@ class ShuffleScheduler { this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); + this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>(); LOG.info("ShuffleScheduler running for sourceVertex: " @@ -1024,13 +1027,13 @@ class ShuffleScheduler { public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, - InputAttemptIdentifier srcAttempt) { + CompositeInputAttemptIdentifier srcAttempt) { uniqueHosts.add(new HostPort(inputHostName, port)); HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId); MapHost host = mapLocations.get(identifier); if (host == null) { - host = new MapHost(inputHostName, port, partitionId); + host = new MapHost(inputHostName, port, partitionId, srcAttempt.getInputIdentifierCount()); mapLocations.put(identifier, host); } @@ -1040,9 +1043,10 @@ class ShuffleScheduler { } host.addKnownMap(srcAttempt); - pathToIdentifierMap.put( - getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), - partitionId), srcAttempt); + for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) { + PathPartition pathPartition = new PathPartition(srcAttempt.getPathComponent(), partitionId + i); + pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i)); + } // Mark the host as pending if (host.getState() == MapHost.State.PENDING) { @@ -1102,7 +1106,7 @@ class ShuffleScheduler { public InputAttemptIdentifier getIdentifierForFetchedOutput( String path, int reduceId) { - return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId)); + return pathToIdentifierMap.get(new PathPartition(path, reduceId)); } private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { @@ -1243,11 +1247,7 @@ class ShuffleScheduler { } } - - private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) { - return new PathPartition(path, reduceId); - } - + /** * A thread that takes hosts off of the penalty list when the timer expires. */ @@ -1394,7 +1394,7 @@ class ShuffleScheduler { codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, - verifyDiskChecksum); + verifyDiskChecksum, compositeFetch); } private class FetchFutureCallback implements FutureCallback<Void> { http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index 8ab65e0..142f582 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.runtime.api.ProgressFailedException; import org.apache.tez.runtime.library.common.Constants; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -122,6 +123,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { codec = null; } + boolean compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); boolean ifileReadAhead = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT); int ifileReadAheadLength = 0; @@ -145,7 +147,7 @@ public class UnorderedKVInput extends AbstractLogicalInput { ifileReadAhead, ifileReadAheadLength, codec, inputManager); this.inputEventHandler = new ShuffleInputEventHandlerImpl(getContext(), shuffleManager, - inputManager, codec, ifileReadAhead, ifileReadAheadLength); + inputManager, codec, ifileReadAhead, ifileReadAheadLength, compositeFetch); ////// End of Initial configuration http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index bd0ea0f..17a065c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -71,11 +71,11 @@ public class TestFetcher { Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT, false, true); - builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); + PORT, false, true, false); + builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); - FetchResult fr = new FetchResult(HOST, PORT, 0, Arrays.asList(srcAttempts)); + FetchResult fr = new FetchResult(HOST, PORT, 0, 1, Arrays.asList(srcAttempts)); Fetcher.HostFetchResult hfr = new Fetcher.HostFetchResult(fr, srcAttempts, false); doReturn(hfr).when(fetcher).setupLocalDiskFetch(); doReturn(null).when(fetcher).doHttpFetch(); @@ -89,8 +89,8 @@ public class TestFetcher { // when enabled and hostname does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT, false, true); - builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts)); + PORT, false, true, false); + builder.assignWork(HOST + "_OTHER", PORT, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); doReturn(null).when(fetcher).setupLocalDiskFetch(); @@ -105,8 +105,8 @@ public class TestFetcher { // when enabled and port does not match use http fetch. builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, - PORT, false, true); - builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts)); + PORT, false, true, false); + builder.assignWork(HOST, PORT + 1, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); doReturn(null).when(fetcher).setupLocalDiskFetch(); @@ -122,8 +122,8 @@ public class TestFetcher { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, - PORT, false, true); - builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); + PORT, false, true, false); + builder.assignWork(HOST, PORT, 0, 1, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); doReturn(null).when(fetcher).setupLocalDiskFetch(); @@ -156,8 +156,8 @@ public class TestFetcher { FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, - false, true); - builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); + false, true, false); + builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); doAnswer(new Answer<Path>() { @@ -275,8 +275,8 @@ public class TestFetcher { FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, - false, true); - builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); + false, true, false); + builder.assignWork(HOST, PORT, partition, 1, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); Assert.assertTrue(expectedSrcAttempts.length == fetcher.srcAttemptsRemaining.size()); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index e085d1a..e6accda 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -51,6 +51,7 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -73,7 +74,7 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); int taskIndex = 1; Event dme = createDataMovementEvent(0, taskIndex, null); @@ -82,8 +83,8 @@ public class TestShuffleInputEventHandlerImpl { eventList.add(dme); handler.handleEvents(eventList); - InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, - PATH_COMPONENT); + CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, + PATH_COMPONENT, 1); verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0)); } @@ -95,7 +96,7 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); int taskIndex = 1; Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(0)); @@ -116,7 +117,7 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); int taskIndex = 1; Event dme = createDataMovementEvent(0, taskIndex, createEmptyPartitionByteString(1)); @@ -124,7 +125,7 @@ public class TestShuffleInputEventHandlerImpl { eventList.add(dme); handler.handleEvents(eventList); - InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT); + CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1); verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier), eq(0)); } @@ -136,7 +137,7 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); int taskIndex1 = 1; Event dme1 = createDataMovementEvent(0, taskIndex1, createEmptyPartitionByteString(0)); @@ -149,7 +150,7 @@ public class TestShuffleInputEventHandlerImpl { handler.handleEvents(eventList); InputAttemptIdentifier expectedIdentifier1 = new InputAttemptIdentifier(taskIndex1, 0); - InputAttemptIdentifier expectedIdentifier2 = new InputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT); + CompositeInputAttemptIdentifier expectedIdentifier2 = new CompositeInputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT, 1); verify(shuffleManager).addCompletedInputWithNoData(eq(expectedIdentifier1)); verify(shuffleManager).addKnownInput(eq(HOST), eq(PORT), eq(expectedIdentifier2), eq(0)); @@ -209,22 +210,22 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); //0--> 1 with spill id 0 (attemptNum 0) Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId1 = new InputAttemptIdentifier(1, 0, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + CompositeInputAttemptIdentifier expectedId1 = new CompositeInputAttemptIdentifier(1, 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId1), eq(0)); //0--> 1 with spill id 1 (attemptNum 0) dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + CompositeInputAttemptIdentifier expectedId2 = new CompositeInputAttemptIdentifier(1, 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1); verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); //0--> 1 with spill id 1 (attemptNum 1). This should report exception @@ -246,14 +247,14 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); //0--> 1 with spill id 0 (attemptNum 1). attemptNum 0 is not sent. Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + CompositeInputAttemptIdentifier expected = new CompositeInputAttemptIdentifier(1, 1, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); //Now send attemptNum 0. This should throw exception, because attempt #1 is already added @@ -275,7 +276,7 @@ public class TestShuffleInputEventHandlerImpl { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, - shuffleManager, inputAllocator, null, false, 0); + shuffleManager, inputAllocator, null, false, 0, false); //0--> 1 with spill id 0 (attemptNum 0) with empty partitions BitSet bitSet = new BitSet(4); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java index 34ca13f..f026cb2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -88,7 +88,7 @@ public class TestShuffleManager { FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl( - inputContext, shuffleManager, inputAllocator, null, false, 0); + inputContext, shuffleManager, inputAllocator, null, false, 0, false); shuffleManager.run(); List<Event> eventList = new LinkedList<Event>(); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 310f1b2..a6e4c21 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -117,7 +117,7 @@ public class TestFetcher { doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("src vertex").when(inputContext).getSourceVertexName(); - MapHost mapHost = new MapHost(HOST, PORT, 0); + MapHost mapHost = new MapHost(HOST, PORT, 0, 1); InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt"); mapHost.addKnownMap(inputAttemptIdentifier); List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier); @@ -128,7 +128,7 @@ public class TestFetcher { null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); fetcher.call(); verify(scheduler).getMapsForHost(mapHost); @@ -151,13 +151,13 @@ public class TestFetcher { final boolean ENABLE_LOCAL_FETCH = true; final boolean DISABLE_LOCAL_FETCH = false; - MapHost mapHost = new MapHost(HOST, PORT, 0); + MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); // when local mode is enabled and host and port matches use local fetch FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -169,13 +169,13 @@ public class TestFetcher { verify(spyFetcher, never()).copyFromHost(any(MapHost.class)); // if hostname does not match use http - mapHost = new MapHost(HOST + "_OTHER", PORT, 0); + mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false ,true); + false, false, true, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -185,13 +185,13 @@ public class TestFetcher { verify(spyFetcher, times(1)).copyFromHost(mapHost); // if port does not match use http - mapHost = new MapHost(HOST, PORT + 1, 0); + mapHost = new MapHost(HOST, PORT + 1, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -201,12 +201,12 @@ public class TestFetcher { verify(spyFetcher, times(1)).copyFromHost(mapHost); //if local fetch is not enabled - mapHost = new MapHost(HOST, PORT, 0); + mapHost = new MapHost(HOST, PORT, 0, 1); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -227,11 +227,11 @@ public class TestFetcher { when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); - MapHost host = new MapHost(HOST, PORT, 1); + MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -372,11 +372,11 @@ public class TestFetcher { when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(HOST, PORT, 1); + final MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); final FetcherOrderedGrouped fetcher = spy(mockFetcher); @@ -461,14 +461,14 @@ public class TestFetcher { doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(HOST, PORT, 1); + final MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - true, false, true); + true, false, true, false); final FetcherOrderedGrouped fetcher = spy(mockFetcher); fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>(); final List<InputAttemptIdentifier> srcAttempts = Arrays.asList( @@ -530,13 +530,13 @@ public class TestFetcher { MergeManager merger = mock(MergeManager.class); ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); - MapHost mapHost = new MapHost(HOST, PORT, 0); + MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size()); Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 26aa298..1d4afde 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -16,6 +16,7 @@ import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Before; @@ -153,7 +154,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { 0, "src vertex"); scheduler = spy(realScheduler); - handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler); + handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, config); mergeManager = mock(MergeManager.class); } @@ -163,9 +164,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { int attemptNum = 0; int inputIdx = 0; Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0); - InputAttemptIdentifier id1 = - new InputAttemptIdentifier(inputIdx, attemptNum, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + CompositeInputAttemptIdentifier id1 = + new CompositeInputAttemptIdentifier(inputIdx, attemptNum, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1); handler.handleEvents(Collections.singletonList(dme1)); int partitionId = attemptNum; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1)); @@ -173,9 +174,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); - InputAttemptIdentifier id2 = - new InputAttemptIdentifier(inputIdx, attemptNum, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); + CompositeInputAttemptIdentifier id2 = + new CompositeInputAttemptIdentifier(inputIdx, attemptNum, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1, 1); handler.handleEvents(Collections.singletonList(dme2)); partitionId = attemptNum; assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); @@ -224,9 +225,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); handler.handleEvents(Collections.singletonList(dme1)); - InputAttemptIdentifier id1 = - new InputAttemptIdentifier(inputIdx, attemptNum, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); + CompositeInputAttemptIdentifier id1 = + new CompositeInputAttemptIdentifier(inputIdx, attemptNum, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1); verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1)); assertTrue("Shuffle info events should not be empty for pipelined shuffle", @@ -252,8 +253,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { Event dme = createDataMovementEvent(srcIdx, targetIdx, null, false); events.add(dme); handler.handleEvents(events); - InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0, - PATH_COMPONENT); + CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(targetIdx, 0, + PATH_COMPONENT, 1); int partitionId = srcIdx; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier)); @@ -310,8 +311,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { events.add(dme); handler.handleEvents(events); int partitionId = srcIdx; - InputAttemptIdentifier expectedIdentifier = - new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT); + CompositeInputAttemptIdentifier expectedIdentifier = + new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1); verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier)); } http://git-wip-us.apache.org/repos/asf/tez/blob/fe6746d7/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 31da4d0..52db21b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -87,8 +88,8 @@ public class TestShuffleScheduler { // Schedule all copies. for (int i = 0; i < numInputs; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -132,8 +133,8 @@ public class TestShuffleScheduler { InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs]; for (int i = 0; i < numInputs; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -189,8 +190,8 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -203,7 +204,7 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //99 fails @@ -211,7 +212,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } @@ -221,7 +222,7 @@ public class TestShuffleScheduler { //Should fail here and report exception as reducer is not healthy scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 % totalProducerNodes), - 10000, 200), false, true, false); + 10000, 200, 1), false, true, false); int minFailurePerHost = conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, @@ -258,8 +259,8 @@ public class TestShuffleScheduler { //Generate 0-200 events for (int i = 0; i < 200; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -283,7 +284,7 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } assertEquals(10, scheduler.remainingMaps.get()); @@ -293,7 +294,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } //Shuffle has not stalled. so no issues. @@ -306,7 +307,7 @@ public class TestShuffleScheduler { new InputAttemptIdentifier(190, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (190 % totalProducerNodes), - 10000, 190), false, true, false); + 10000, 190, 1), false, true, false); //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures verify(scheduler.reporter, times(0)).reportException(any(Throwable.class)); @@ -318,15 +319,15 @@ public class TestShuffleScheduler { inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } assertEquals(61, scheduler.failedShufflesSinceLastCompletion); @@ -339,11 +340,11 @@ public class TestShuffleScheduler { inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } // Should fail now due to fetcherHealthy. (stall has already happened and @@ -374,8 +375,8 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -388,14 +389,14 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(319, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); //stall the shuffle scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; @@ -405,13 +406,13 @@ public class TestShuffleScheduler { //Retry for 3 more times scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 310), false, true, false); + 10000, 310, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 310), false, true, false); + 10000, 310, 1), false, true, false); // failedShufflesSinceLastCompletion has crossed the limits. Throw error verify(shuffle, times(0)).reportException(any(Throwable.class)); @@ -439,8 +440,8 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -451,19 +452,19 @@ public class TestShuffleScheduler { new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % - totalProducerNodes), 10000, i), false, true, false); + totalProducerNodes), 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % - totalProducerNodes), 10000, i), false, true, false); + totalProducerNodes), 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % - totalProducerNodes), 10000, i), false, true, false); + totalProducerNodes), 10000, i, 1), false, true, false); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //319 succeeds @@ -474,14 +475,14 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(319, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); //stall the shuffle (but within limits) scheduler.lastProgressTime = System.currentTimeMillis() - 100000; @@ -491,13 +492,13 @@ public class TestShuffleScheduler { //Retry for 3 more times scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as // well. However, it has failed only in one host. So this should proceed @@ -508,7 +509,7 @@ public class TestShuffleScheduler { scheduler.lastProgressTime = System.currentTimeMillis() - 300000; scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), - 10000, 319), false, true, false); + 10000, 319, 1), false, true, false); verify(shuffle, times(1)).reportException(any(Throwable.class)); } @@ -534,8 +535,8 @@ public class TestShuffleScheduler { //Generate 319 events (last event has not arrived) for (int i = 0; i < 319; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -548,14 +549,14 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(318, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes), - 10000, 318), false, true, false); + 10000, 318, 1), false, true, false); //stall the shuffle scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; @@ -565,13 +566,13 @@ public class TestShuffleScheduler { //Retry for 3 more times scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes), - 10000, 318), false, true, false); + 10000, 318, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes), - 10000, 318), false, true, false); + 10000, 318, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes), - 10000, 318), false, true, false); + 10000, 318, 1), false, true, false); //Shuffle has not received the events completely. So do not bail out yet. verify(shuffle, times(0)).reportException(any(Throwable.class)); @@ -613,8 +614,8 @@ public class TestShuffleScheduler { //Generate 320 events (last event has not arrived) for (int i = 0; i < 320; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -627,7 +628,7 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); + 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } //5 fetches fail once @@ -635,7 +636,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } assertTrue(scheduler.failureCounts.size() >= 5); @@ -649,9 +650,9 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), - 10000, i), false, true, false); + 10000, i, 1), false, true, false); } boolean checkFailedFetchSinceLastCompletion = conf.getBoolean @@ -689,8 +690,8 @@ public class TestShuffleScheduler { //Generate 320 events for (int i = 0; i < 320; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, inputAttemptIdentifier); } @@ -703,7 +704,7 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, - new MapHost("host" + (i % totalProducerNodes), 10000, i), + new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), 100, 200, startTime + (i * 100), mapOutput, false); } @@ -712,16 +713,16 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost("host" + (i % totalProducerNodes), 10000, i), + new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost("host" + (i % totalProducerNodes), 10000, i), + new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost("host" + (i % totalProducerNodes), 10000, i), + new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost("host" + (i % totalProducerNodes), 10000, i), + new MapHost("host" + (i % totalProducerNodes), 10000, i, 1), false, true, false); } @@ -752,8 +753,8 @@ public class TestShuffleScheduler { Shuffle shuffle = mock(Shuffle.class); final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle); - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(0, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(0, 0, "attempt_", 1); scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier); assertTrue(scheduler.pendingHosts.size() == 1); @@ -800,8 +801,8 @@ public class TestShuffleScheduler { InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs]; for (int i = 0; i < numInputs; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -856,8 +857,8 @@ public class TestShuffleScheduler { InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs]; for (int i = 0; i < numInputs; i++) { - InputAttemptIdentifier inputAttemptIdentifier = - new InputAttemptIdentifier(i, 0, "attempt_"); + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; }
