Repository: tez Updated Branches: refs/heads/master ac0fd8bb3 -> 3f5a7f35d
TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3f5a7f35 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3f5a7f35 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3f5a7f35 Branch: refs/heads/master Commit: 3f5a7f35dd5a8e28fcf56c2e2bdad86f8d7febbf Parents: ac0fd8b Author: Jonathan Eagles <[email protected]> Authored: Wed Mar 2 14:02:04 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Wed Mar 2 14:02:04 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../library/common/shuffle/ShuffleUtils.java | 11 +- .../orderedgrouped/FetcherOrderedGrouped.java | 26 ++- .../common/shuffle/orderedgrouped/MapHost.java | 137 +++++++++--- .../common/shuffle/orderedgrouped/Shuffle.java | 5 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 30 +-- .../orderedgrouped/ShuffleScheduler.java | 88 ++++++-- .../shuffle/orderedgrouped/TestFetcher.java | 45 ++-- ...tShuffleInputEventHandlerOrderedGrouped.java | 18 +- .../orderedgrouped/TestShuffleScheduler.java | 213 ++++++++++--------- 10 files changed, 349 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 47913ab..5a7ae58 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3151. Expose DAG credentials to plugins. TEZ-3149. Tez-tools: Add username in DagInfo. TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier. @@ -395,6 +396,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3149. Tez-tools: Add username in DagInfo. TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier. TEZ-3141. mapreduce.task.timeout is not translated to container heartbeat timeout http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index e8bf6ae..013a002 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -190,18 +190,13 @@ public class ShuffleUtils { } } - // TODO NEWTEZ handle ssl shuffle public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) { - return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port), - partition, appId, dagIdentifier, sslShuffle); - } - - public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier, - int partition, String appId, int dagIdentifier, boolean sslShuffle) { final String http_protocol = (sslShuffle) ? "https://" : "http://"; StringBuilder sb = new StringBuilder(http_protocol); - sb.append(hostIdentifier); + sb.append(host); + sb.append(":"); + sb.append(port); sb.append("/"); sb.append("mapOutput?job="); sb.append(appId.replace("application", "job")); http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 93f083d..51bdf68 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -72,7 +72,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final ExceptionReporter exceptionReporter; private final int id; private final String logIdentifier; - private final String localShuffleHostPort; + private final String localShuffleHost; + private final int localShufflePort; + private final String applicationId; + private final int dagId; private final MapHost mapHost; private final int currentPartition; @@ -82,6 +85,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final JobTokenSecretManager jobTokenSecretManager; final HttpConnectionParams httpConnectionParams; + private final boolean sslShuffle; @VisibleForTesting volatile boolean stopped = false; @@ -118,7 +122,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { TezCounter wrongMapErrsCounter, TezCounter connectionErrsCounter, TezCounter wrongReduceErrsCounter, - boolean asyncHttp) { + String applicationId, + int dagId, + boolean asyncHttp, + boolean sslShuffle) { this.scheduler = scheduler; this.allocator = allocator; this.metrics = metrics; @@ -134,6 +141,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { this.wrongMapErrs = wrongMapErrsCounter; this.connectionErrs = connectionErrsCounter; this.wrongReduceErrs = wrongReduceErrsCounter; + this.applicationId = applicationId; + this.dagId = dagId; this.ifileReadAhead = ifileReadAhead; this.ifileReadAheadLength = ifileReadAheadLength; @@ -145,9 +154,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { this.codec = null; } this.conf = conf; - this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort); + this.localShuffleHost = localHostname; + this.localShufflePort = shufflePort; this.localDiskFetchEnabled = localDiskFetchEnabled; + this.sslShuffle = sslShuffle; this.logIdentifier = "fetcher [" + srcNameTrimmed + "] #" + id; } @@ -157,8 +168,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { try { metrics.threadBusy(); - String hostPort = mapHost.getHostIdentifier(); - if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) { + if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { setupLocalDiskFetch(mapHost); } else { // Shuffle @@ -319,8 +329,9 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { throws IOException { boolean connectSucceeded = false; try { - URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts, - httpConnectionParams.isKeepAlive()); + StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(), + host.getPort(), host.getPartitionId(), applicationId, dagId, sslShuffle); + URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive()); httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams, logIdentifier, jobTokenSecretManager); connectSucceeded = httpConnection.connect(); @@ -734,6 +745,5 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { remaining.put(id.toString(), id); } } - } http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java index 3116568..486d8c5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java @@ -25,51 +25,138 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier; @Private class MapHost { - + public static enum State { IDLE, // No map outputs available BUSY, // Map outputs are being fetched PENDING, // Known map outputs which need to be fetched PENALIZED // Host penalized due to shuffle failures } - + + public static class HostPort { + + final String host; + final int port; + + HostPort(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostPort other = (HostPort) obj; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + if (port != other.port) + return false; + return true; + } + + @Override + public String toString() { + return "HostPort [host=" + host + ", port=" + port + "]"; + } + } + + public static class HostPortPartition { + + final String host; + final int port; + final int partition; + + HostPortPartition(String host, int port, int partition) { + this.host = host; + this.port = port; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + partition; + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostPortPartition other = (HostPortPartition) obj; + if (partition != other.partition) + return false; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + if (port != other.port) + return false; + return true; + } + + @Override + public String toString() { + return "HostPortPartition [host=" + host + ", port=" + port + ", partition=" + partition + "]"; + } + } + private State state = State.IDLE; - private final String hostIdentifier; - private final int partitionId; - private final String baseUrl; - private final String identifier; + private final String host; + private final int port; + private final int partition; // Tracks attempt IDs private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>(); - public MapHost(int partitionId, String hostPort, String baseUrl) { - this.partitionId = partitionId; - this.hostIdentifier = hostPort; - this.baseUrl = baseUrl; - this.identifier = createIdentifier(hostPort, partitionId); - } - - public static String createIdentifier(String hostName, int partitionId) { - return hostName + ":" + Integer.toString(partitionId); + public MapHost(String host, int port, int partition) { + this.host = host; + this.port = port; + this.partition = partition; } - public String getIdentifier() { - return identifier; - } - public int getPartitionId() { - return partitionId; + return partition; } public State getState() { return state; } - public String getHostIdentifier() { - return hostIdentifier; + public String getHost() { + return host; + } + + public int getPort() { + return port; } - public String getBaseUrl() { - return baseUrl; + public String getHostIdentifier() { + return host + ":" + port; } public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) { @@ -112,7 +199,7 @@ class MapHost { @Override public String toString() { - return hostIdentifier; + return getHostIdentifier(); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index db43651..fa66b7e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -150,8 +150,6 @@ public class Shuffle implements ExceptionReporter { + (codec == null ? "None" : codec.getClass().getName()) + ", ifileReadAhead: " + ifileReadAhead); - boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); startTime = System.currentTimeMillis(); merger = new MergeManager( this.conf, @@ -188,8 +186,7 @@ public class Shuffle implements ExceptionReporter { eventHandler= new ShuffleInputEventHandlerOrderedGrouped( inputContext, - scheduler, - sslShuffle); + scheduler); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 6e6d967..7991485 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -19,16 +19,15 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; -import java.net.URI; import java.util.BitSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezUncheckedException; @@ -49,7 +48,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; - private final boolean sslShuffle; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); private final AtomicInteger numDmeEvents = new AtomicInteger(0); @@ -57,10 +55,9 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0); public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, - ShuffleScheduler scheduler, boolean sslShuffle) { + ShuffleScheduler scheduler) { this.inputContext = inputContext; this.scheduler = scheduler; - this.sslShuffle = sslShuffle; } @Override @@ -103,17 +100,19 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int partitionId = dmEvent.getSourceIndex(); + InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); + if (LOG.isDebugEnabled()) { LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); } + if (shufflePayload.hasEmptyPartitions()) { try { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartitionsBitSet.get(partitionId)) { - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); if (LOG.isDebugEnabled()) { LOG.debug( "Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" @@ -129,13 +128,10 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl } } - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); - - URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId); - scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), - partitionId, baseUri.toString(), srcAttemptIdentifier); + scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(), + partitionId, srcAttemptIdentifier); } - + private void processTaskFailedEvent(InputFailedEvent ifEvent) { InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion()); scheduler.obsoleteInput(taIdentifier); @@ -144,14 +140,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl } } - @VisibleForTesting - URI getBaseURI(String host, int port, int partitionId) { - StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, - partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle); - URI u = URI.create(sb.toString()); - return u; - } - /** * Helper method to create InputAttemptIdentifier * @@ -161,7 +149,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl */ private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload) { - String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null; + String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null; int spillEventId = shufflePayload.getSpillId(); InputAttemptIdentifier srcAttemptIdentifier = null; if (shufflePayload.hasSpillId()) { http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 8cba2a6..2f6e490 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -73,12 +73,58 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; import com.google.common.collect.Lists; class ShuffleScheduler { + public static class PathPartition { + + final String path; + final int partition; + + PathPartition(String path, int partition) { + this.path = path; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + partition; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PathPartition other = (PathPartition) obj; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (partition != other.partition) + return false; + return true; + } + + @Override + public String toString() { + return "PathPartition [path=" + path + ", partition=" + partition + "]"; + } + } + @VisibleForTesting enum ShuffleErrors { IO_ERROR, @@ -101,11 +147,11 @@ class ShuffleScheduler { private final int numInputs; private int numFetchedSpills; @VisibleForTesting - final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>(); + final Map<HostPortPartition, MapHost> mapLocations = new HashMap<HostPortPartition, MapHost>(); //TODO Clean this and other maps at some point @VisibleForTesting - final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap - = new ConcurrentHashMap<String, InputAttemptIdentifier>(); + final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>(); //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @@ -122,9 +168,8 @@ class ShuffleScheduler { private final Referee referee; @VisibleForTesting final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>(); - final Set<String> uniqueHosts = Sets.newHashSet(); - private final Map<String,IntWritable> hostFailures = - new HashMap<String,IntWritable>(); + final Set<HostPort> uniqueHosts = Sets.newHashSet(); + private final Map<HostPort,IntWritable> hostFailures = new HashMap<HostPort,IntWritable>(); private final InputContext inputContext; private final TezCounter shuffledInputsCounter; private final TezCounter skippedInputCounter; @@ -166,7 +211,10 @@ class ShuffleScheduler { private final boolean localDiskFetchEnabled; private final String localHostname; private final int shufflePort; + private final String applicationId; + private final int dagId; private final boolean asyncHttp; + private final boolean sslShuffle; private final TezCounter ioErrsCounter; private final TezCounter wrongLengthErrsCounter; @@ -275,6 +323,8 @@ class ShuffleScheduler { TezRuntimeConfiguration .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); + this.applicationId = inputContext.getApplicationId().toString(); + this.dagId = inputContext.getDagIdentifier(); this.localHostname = inputContext.getExecutionContext().getHostName(); final ByteBuffer shuffleMetadata = inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); @@ -311,6 +361,8 @@ class ShuffleScheduler { this.startTime = startTime; this.lastProgressTime = startTime; + this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(), @@ -477,7 +529,7 @@ class ShuffleScheduler { failureCounts.remove(srcAttemptIdentifier); if (host != null) { - hostFailures.remove(host.getHostIdentifier()); + hostFailures.remove(new HostPort(host.getHost(), host.getPort())); } output.commit(); @@ -691,7 +743,7 @@ class ShuffleScheduler { private void penalizeHost(MapHost host, int failures) { host.penalize(); - String hostPort = host.getHostIdentifier(); + HostPort hostPort = new HostPort(host.getHost(), host.getPort()); // TODO TEZ-922 hostFailures isn't really used for anything apart from // hasFailedAcrossNodes().Factor it into error // reporting / potential blacklisting of hosts. @@ -766,7 +818,7 @@ class ShuffleScheduler { (int) Math.ceil(numUniqueHosts * hostFailureFraction)); int total = 0; boolean failedAcrossNodes = false; - for(String host : uniqueHosts) { + for(HostPort host : uniqueHosts) { IntWritable failures = hostFailures.get(host); if (failures != null && failures.get() > minFailurePerHost) { total++; @@ -926,17 +978,13 @@ class ShuffleScheduler { public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, - String hostUrl, InputAttemptIdentifier srcAttempt) { - String hostPort = (inputHostName + ":" + String.valueOf(port)); - uniqueHosts.add(hostPort); - String identifier = MapHost.createIdentifier(hostPort, partitionId); - + uniqueHosts.add(new HostPort(inputHostName, port)); + HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId); MapHost host = mapLocations.get(identifier); if (host == null) { - host = new MapHost(partitionId, hostPort, hostUrl); - assert identifier.equals(host.getIdentifier()); + host = new MapHost(inputHostName, port, partitionId); mapLocations.put(identifier, host); } @@ -1150,8 +1198,8 @@ class ShuffleScheduler { } - private String getIdentifierFromPathAndReduceId(String path, int reduceId) { - return path + "_" + reduceId; + private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) { + return new PathPartition(path, reduceId); } /** @@ -1274,7 +1322,7 @@ class ShuffleScheduler { count++; if (LOG.isDebugEnabled()) { LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}", - mapHost.getIdentifier()); + mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId()); } FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost); runningFetchers.add(fetcherOrderedGrouped); @@ -1299,7 +1347,7 @@ class ShuffleScheduler { shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, - connectionErrsCounter, wrongReduceErrsCounter, asyncHttp); + connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle); } private class FetchFutureCallback implements FutureCallback<Void> { http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 20fb9a9..89d35f4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -57,6 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.dag.api.TezConfiguration; @@ -79,6 +80,8 @@ public class TestFetcher { public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_"; public static final String HOST = "localhost"; public static final int PORT = 65; + public static final int DAG_ID = 1; + public static final String APP_ID = "application_1234_1"; private TezCounters tezCounters = new TezCounters(); private TezCounter ioErrsCounter = tezCounters.findCounter(ShuffleScheduler.SHUFFLE_ERR_GRP_NAME, @@ -114,7 +117,7 @@ public class TestFetcher { doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("src vertex").when(inputContext).getSourceVertexName(); - MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + MapHost mapHost = new MapHost(HOST, PORT, 0); InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt"); mapHost.addKnownMap(inputAttemptIdentifier); List<InputAttemptIdentifier> mapsForHost = Lists.newArrayList(inputAttemptIdentifier); @@ -124,7 +127,7 @@ public class TestFetcher { new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); fetcher.call(); verify(scheduler).getMapsForHost(mapHost); @@ -147,12 +150,12 @@ public class TestFetcher { final boolean ENABLE_LOCAL_FETCH = true; final boolean DISABLE_LOCAL_FETCH = false; - MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); // when local mode is enabled and host and port matches use local fetch FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -164,12 +167,12 @@ public class TestFetcher { verify(spyFetcher, never()).copyFromHost(any(MapHost.class)); // if hostname does not match use http - mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl"); + mapHost = new MapHost(HOST + "_OTHER", PORT, 0); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -179,12 +182,12 @@ public class TestFetcher { verify(spyFetcher, times(1)).copyFromHost(mapHost); // if port does not match use http - mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl"); + mapHost = new MapHost(HOST, PORT + 1, 0); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -194,11 +197,11 @@ public class TestFetcher { verify(spyFetcher, times(1)).copyFromHost(mapHost); //if local fetch is not enabled - mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + mapHost = new MapHost(HOST, PORT, 0); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); spyFetcher = spy(fetcher); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); @@ -219,11 +222,10 @@ public class TestFetcher { when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); - MapHost host = new MapHost(1, HOST + ":" + PORT, - "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map="); + MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@ -361,13 +363,13 @@ public class TestFetcher { InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); + when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(1, HOST + ":" + PORT, - "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map="); + final MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); final FetcherOrderedGrouped fetcher = spy(mockFetcher); @@ -379,7 +381,7 @@ public class TestFetcher { doReturn(srcAttempts).when(scheduler).getMapsForHost(host); doReturn(true).when(fetcher).setupConnection(any(MapHost.class), any(Collection.class)); - URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false); + URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=", srcAttempts, false); fetcher.httpConnection = new FakeHttpConnection(url, null, "", null); doAnswer(new Answer<MapOutput>() { @@ -452,14 +454,13 @@ public class TestFetcher { doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(1, HOST + ":" + PORT, - "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map="); + final MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, true, false); final FetcherOrderedGrouped fetcher = spy(mockFetcher); fetcher.remaining = new LinkedHashMap<String, InputAttemptIdentifier>(); final List<InputAttemptIdentifier> srcAttempts = Arrays.asList( @@ -521,12 +522,12 @@ public class TestFetcher { MergeManager merger = mock(MergeManager.class); ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); - MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, - wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false); + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false); fetcher.populateRemainingMap(new LinkedList<InputAttemptIdentifier>(Arrays.asList(srcAttempts))); Assert.assertEquals(expectedSrcAttempts.length, fetcher.remaining.size()); Iterator<Entry<String, InputAttemptIdentifier>> iterator = fetcher.remaining.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index de066fe..26aa298 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -153,7 +153,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { 0, "src vertex"); scheduler = spy(realScheduler); - handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false); + handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler); mergeManager = mock(MergeManager.class); } @@ -167,9 +167,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme1)); - String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); int partitionId = attemptNum; - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1)); verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); //Send final_update event. @@ -178,10 +177,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); handler.handleEvents(Collections.singletonList(dme2)); - baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); partitionId = attemptNum; assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id2)); assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); MapHost host = scheduler.getHost(); @@ -222,7 +220,6 @@ public class TestShuffleInputEventHandlerOrderedGrouped { //Process attempt #1 first int attemptNum = 1; int inputIdx = 1; - String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); handler.handleEvents(Collections.singletonList(dme1)); @@ -231,7 +228,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); - verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); + verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1)); assertTrue("Shuffle info events should not be empty for pipelined shuffle", !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); @@ -257,10 +254,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0, PATH_COMPONENT); - String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); int partitionId = srcIdx; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), - eq(baseUri), eq(expectedIdentifier)); + eq(expectedIdentifier)); assertTrue("Shuffle info events should be empty for regular shuffle codepath", scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); } @@ -313,12 +309,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped { false); events.add(dme); handler.handleEvents(events); - String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); int partitionId = srcIdx; InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT); - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), - eq(expectedIdentifier)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier)); } private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/3f5a7f35/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index f7ef309..15cfa48 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -89,7 +89,7 @@ public class TestShuffleScheduler { for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); + scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -134,7 +134,7 @@ public class TestShuffleScheduler { for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); + scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; } @@ -192,7 +192,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } //100 succeeds @@ -202,16 +202,16 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //99 fails for (int i = 100; i < 199; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } @@ -219,9 +219,9 @@ public class TestShuffleScheduler { new InputAttemptIdentifier(200, 0, "attempt_"); //Should fail here and report exception as reducer is not healthy - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (200 % + totalProducerNodes), + 10000, 200), false, true, false); int minFailurePerHost = conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, @@ -261,7 +261,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } assertEquals(320, scheduler.remainingMaps.get()); @@ -282,8 +282,8 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } assertEquals(10, scheduler.remainingMaps.get()); @@ -292,8 +292,8 @@ public class TestShuffleScheduler { for (int i = 190; i < 200; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } //Shuffle has not stalled. so no issues. @@ -304,9 +304,9 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(190, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" + - (190 % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + + (190 % totalProducerNodes), + 10000, 190), false, true, false); //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures verify(scheduler.reporter, times(0)).reportException(any(Throwable.class)); @@ -317,16 +317,16 @@ public class TestShuffleScheduler { for (int i = 190; i < 200; i++) { inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } assertEquals(61, scheduler.failedShufflesSinceLastCompletion); @@ -338,12 +338,12 @@ public class TestShuffleScheduler { for (int i = 110; i < 120; i++) { inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } // Should fail now due to fetcherHealthy. (stall has already happened and @@ -377,7 +377,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } //319 succeeds @@ -387,15 +387,15 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(319, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), + 10000, 319), false, true, false); //stall the shuffle scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; @@ -403,15 +403,15 @@ public class TestShuffleScheduler { assertEquals(scheduler.remainingMaps.get(), 1); //Retry for 3 more times - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 319), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 310), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 310), false, true, false); // failedShufflesSinceLastCompletion has crossed the limits. Throw error verify(shuffle, times(0)).reportException(any(Throwable.class)); @@ -442,7 +442,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } //Tasks fail in 20% of nodes 3 times, but are able to proceed further @@ -450,20 +450,20 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % - totalProducerNodes) + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % + totalProducerNodes), 10000, i), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % - totalProducerNodes) + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % + totalProducerNodes), 10000, i), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % - totalProducerNodes) + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % + totalProducerNodes), 10000, i), false, true, false); MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //319 succeeds @@ -473,15 +473,15 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(319, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % totalProducerNodes), + 10000, 319), false, true, false); //stall the shuffle (but within limits) scheduler.lastProgressTime = System.currentTimeMillis() - 100000; @@ -489,15 +489,15 @@ public class TestShuffleScheduler { assertEquals(scheduler.remainingMaps.get(), 1); //Retry for 3 more times - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 319), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 319), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 319), false, true, false); // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as // well. However, it has failed only in one host. So this should proceed @@ -506,9 +506,9 @@ public class TestShuffleScheduler { //stall the shuffle (but within limits) scheduler.lastProgressTime = System.currentTimeMillis() - 300000; - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (319 % + totalProducerNodes), + 10000, 319), false, true, false); verify(shuffle, times(1)).reportException(any(Throwable.class)); } @@ -537,7 +537,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } //318 succeeds @@ -547,15 +547,15 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //1 fails (last fetch) InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(318, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % totalProducerNodes), + 10000, 318), false, true, false); //stall the shuffle scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; @@ -563,15 +563,15 @@ public class TestShuffleScheduler { assertEquals(scheduler.remainingMaps.get(), 1); //Retry for 3 more times - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % - totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % + totalProducerNodes), + 10000, 318), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % + totalProducerNodes), + 10000, 318), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (318 % + totalProducerNodes), + 10000, 318), false, true, false); //Shuffle has not received the events completely. So do not bail out yet. verify(shuffle, times(0)).reportException(any(Throwable.class)); @@ -616,7 +616,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), - 10000, i, "hostUrl", inputAttemptIdentifier); + 10000, i, inputAttemptIdentifier); } //10 succeeds @@ -626,16 +626,16 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } //5 fetches fail once for (int i = 10; i < 15; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } assertTrue(scheduler.failureCounts.size() >= 5); @@ -648,10 +648,10 @@ public class TestShuffleScheduler { for (int i = 10; i < 15; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); - scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) - + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost("host" + (i % totalProducerNodes), + 10000, i), false, true, false); } boolean checkFailedFetchSinceLastCompletion = conf.getBoolean @@ -692,7 +692,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, - "hostUrl", inputAttemptIdentifier); + inputAttemptIdentifier); } //100 succeeds @@ -703,7 +703,7 @@ public class TestShuffleScheduler { .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); scheduler.copySucceeded(inputAttemptIdentifier, - new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + new MapHost("host" + (i % totalProducerNodes), 10000, i), 100, 200, startTime + (i * 100), mapOutput, false); } @@ -712,16 +712,16 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + new MapHost("host" + (i % totalProducerNodes), 10000, i), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + new MapHost("host" + (i % totalProducerNodes), 10000, i), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + new MapHost("host" + (i % totalProducerNodes), 10000, i), false, true, false); scheduler.copyFailed(inputAttemptIdentifier, - new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + new MapHost("host" + (i % totalProducerNodes), 10000, i), false, true, false); } @@ -754,7 +754,7 @@ public class TestShuffleScheduler { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, "attempt_"); - scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier); + scheduler.addKnownMapOutput("host0", 10000, 0, inputAttemptIdentifier); assertTrue(scheduler.pendingHosts.size() == 1); assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING); @@ -765,12 +765,13 @@ public class TestShuffleScheduler { //Should not get host, as it is added to penalty loop MapHost host = scheduler.getHost(); - assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000")); + assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000")); + //Refree thread would release it after INITIAL_PENALTY timeout Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000); host = scheduler.getHost(); - assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000")); + assertFalse("Host identifier mismatch", (host.getHost() + ":" + host.getPort() + ":" + host.getPartitionId()).equalsIgnoreCase("host0:10000")); } @Test(timeout = 5000) @@ -801,7 +802,7 @@ public class TestShuffleScheduler { for (int i = 0; i < numInputs; i++) { InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(i, 0, "attempt_"); - scheduler.addKnownMapOutput("host" + i, 10000, 1, "hostUrl", inputAttemptIdentifier); + scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); identifiers[i] = inputAttemptIdentifier; }
