Repository: tez Updated Branches: refs/heads/branch-0.7 f167c8575 -> 84922f834
TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/84922f83 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/84922f83 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/84922f83 Branch: refs/heads/branch-0.7 Commit: 84922f834248e31e681e5ad207b63c578af0be91 Parents: f167c85 Author: Jonathan Eagles <[email protected]> Authored: Wed Mar 2 14:05:27 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Wed Mar 2 14:05:27 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/common/shuffle/ShuffleUtils.java | 11 +- .../orderedgrouped/FetcherOrderedGrouped.java | 20 ++- .../common/shuffle/orderedgrouped/MapHost.java | 135 +++++++++++++++---- .../common/shuffle/orderedgrouped/Shuffle.java | 5 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 28 ++-- .../orderedgrouped/ShuffleScheduler.java | 86 +++++++++--- .../shuffle/orderedgrouped/TestFetcher.java | 20 +-- ...tShuffleInputEventHandlerOrderedGrouped.java | 18 +-- 9 files changed, 219 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c9b13b0..b73fd72 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3149. Tez-tools: Add username in DagInfo TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier. TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 5c9ff77..a4e95f7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -184,18 +184,13 @@ public class ShuffleUtils { } } - // TODO NEWTEZ handle ssl shuffle public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, String appId, boolean sslShuffle) { - return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port), - partition, appId, sslShuffle); - } - - public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier, - int partition, String appId, boolean sslShuffle) { final String http_protocol = (sslShuffle) ? "https://" : "http://"; StringBuilder sb = new StringBuilder(http_protocol); - sb.append(hostIdentifier); + sb.append(host); + sb.append(":"); + sb.append(port); sb.append("/"); sb.append("mapOutput?job="); sb.append(appId.replace("application", "job")); http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 7b666e9..385ed48 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -38,6 +38,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -72,7 +73,9 @@ class FetcherOrderedGrouped extends Thread { private final Shuffle shuffle; private final int id; private final String logIdentifier; - private final String localShuffleHostPort; + private final String localShuffleHost; + private final int localShufflePort; + private final String applicationId; private static int nextId = 0; private int currentPartition = -1; @@ -94,6 +97,7 @@ class FetcherOrderedGrouped extends Thread { HttpConnection httpConnection; HttpConnectionParams httpConnectionParams; + private final boolean sslShuffle; // Initiative value is 0, which means it hasn't retried yet. private long retryStartTime = 0; @@ -127,6 +131,7 @@ class FetcherOrderedGrouped extends Thread { ShuffleErrors.CONNECTION.toString()); wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString()); + applicationId = inputContext.getApplicationId().toString(); this.ifileReadAhead = ifileReadAhead; this.ifileReadAheadLength = ifileReadAheadLength; @@ -137,9 +142,12 @@ class FetcherOrderedGrouped extends Thread { this.codec = null; } this.conf = conf; - this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort); + this.localShuffleHost = localHostname; + this.localShufflePort = shufflePort; this.localDiskFetchEnabled = localDiskFetchEnabled; + this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); this.logIdentifier = "fetcher {" + TezUtilsInternal .cleanVertexName(inputContext.getSourceVertexName()) + "} #" + id; @@ -161,8 +169,7 @@ class FetcherOrderedGrouped extends Thread { assignedHost = scheduler.getHost(); metrics.threadBusy(); - String hostPort = assignedHost.getHostIdentifier(); - if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) { + if (localDiskFetchEnabled && assignedHost.getHost().equals(localShuffleHost) && assignedHost.getPort() == localShufflePort) { setupLocalDiskFetch(assignedHost); } else { // Shuffle @@ -333,8 +340,9 @@ class FetcherOrderedGrouped extends Thread { throws IOException { boolean connectSucceeded = false; try { - URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts, - httpConnectionParams.getKeepAlive()); + StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(), + host.getPort(), host.getPartitionId(), applicationId, sslShuffle); + URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.getKeepAlive()); httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, jobTokenSecretManager); connectSucceeded = httpConnection.connect(); http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java index 3116568..7f8a23c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java @@ -32,44 +32,131 @@ class MapHost { PENDING, // Known map outputs which need to be fetched PENALIZED // Host penalized due to shuffle failures } - + + public static class HostPort { + + final String host; + final int port; + + HostPort(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostPort other = (HostPort) obj; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + if (port != other.port) + return false; + return true; + } + + @Override + public String toString() { + return "HostPort [host=" + host + ", port=" + port + "]"; + } + } + + public static class HostPortPartition { + + final String host; + final int port; + final int partition; + + HostPortPartition(String host, int port, int partition) { + this.host = host; + this.port = port; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + partition; + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostPortPartition other = (HostPortPartition) obj; + if (partition != other.partition) + return false; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + if (port != other.port) + return false; + return true; + } + + @Override + public String toString() { + return "HostPortPartition [host=" + host + ", port=" + port + ", partition=" + partition + "]"; + } + } + private State state = State.IDLE; - private final String hostIdentifier; - private final int partitionId; - private final String baseUrl; - private final String identifier; + private final String host; + private final int port; + private final int partition; // Tracks attempt IDs private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>(); - public MapHost(int partitionId, String hostPort, String baseUrl) { - this.partitionId = partitionId; - this.hostIdentifier = hostPort; - this.baseUrl = baseUrl; - this.identifier = createIdentifier(hostPort, partitionId); - } - - public static String createIdentifier(String hostName, int partitionId) { - return hostName + ":" + Integer.toString(partitionId); + public MapHost(String host, int port, int partition) { + this.host = host; + this.port = port; + this.partition = partition; } - public String getIdentifier() { - return identifier; - } - public int getPartitionId() { - return partitionId; + return partition; } public State getState() { return state; } - public String getHostIdentifier() { - return hostIdentifier; + public String getHost() { + return host; + } + + public int getPort() { + return port; } - public String getBaseUrl() { - return baseUrl; + public String getHostIdentifier() { + return host + ":" + port; } public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) { @@ -112,7 +199,7 @@ class MapHost { @Override public String toString() { - return hostIdentifier; + return getHostIdentifier(); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java index ca3c05d..efce6d2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java @@ -194,8 +194,6 @@ public class Shuffle implements ExceptionReporter { + (codec == null ? "None" : codec.getClass().getName()) + ", ifileReadAhead: " + ifileReadAhead); - boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); startTime = System.currentTimeMillis(); scheduler = new ShuffleScheduler( this.inputContext, @@ -231,8 +229,7 @@ public class Shuffle implements ExceptionReporter { eventHandler= new ShuffleInputEventHandlerOrderedGrouped( inputContext, - scheduler, - sslShuffle); + scheduler); ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build()); http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 7c4eb98..0b3f4fc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -19,16 +19,15 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import java.io.IOException; -import java.net.URI; import java.util.BitSet; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.StringInterner; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezUncheckedException; @@ -49,7 +48,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; - private final boolean sslShuffle; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); private final AtomicInteger numDmeEvents = new AtomicInteger(0); @@ -57,10 +55,9 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0); public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, - ShuffleScheduler scheduler, boolean sslShuffle) { + ShuffleScheduler scheduler) { this.inputContext = inputContext; this.scheduler = scheduler; - this.sslShuffle = sslShuffle; } @Override @@ -103,17 +100,19 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e); } int partitionId = dmEvent.getSourceIndex(); + InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); + if (LOG.isDebugEnabled()) { LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); } + if (shufflePayload.hasEmptyPartitions()) { try { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions); if (emptyPartitionsBitSet.get(partitionId)) { - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); if (LOG.isDebugEnabled()) { LOG.debug( "Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" @@ -129,11 +128,8 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl } } - InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent, shufflePayload); - - URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId); - scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), - partitionId, baseUri.toString(), srcAttemptIdentifier); + scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(), + partitionId, srcAttemptIdentifier); } private void processTaskFailedEvent(InputFailedEvent ifEvent) { @@ -144,14 +140,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl } } - @VisibleForTesting - URI getBaseURI(String host, int port, int partitionId) { - StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, - partitionId, inputContext.getApplicationId().toString(), sslShuffle); - URI u = URI.create(sb.toString()); - return u; - } - /** * Helper method to create InputAttemptIdentifier * @@ -161,7 +149,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl */ private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent, DataMovementEventPayloadProto shufflePayload) { - String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent() : null; + String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent()) : null; int spillEventId = shufflePayload.getSpillId(); InputAttemptIdentifier srcAttemptIdentifier = null; if (shufflePayload.hasSpillId()) { http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index b8b6cf2..65b195a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -54,6 +54,8 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; import com.google.common.collect.Lists; @@ -65,18 +67,62 @@ class ShuffleScheduler { } }; + public static class PathPartition { + + final String path; + final int partition; + + PathPartition(String path, int partition) { + this.path = path; + this.partition = partition; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + partition; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PathPartition other = (PathPartition) obj; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (partition != other.partition) + return false; + return true; + } + + @Override + public String toString() { + return "PathPartition [path=" + path + ", partition=" + partition + "]"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class); static final long INITIAL_PENALTY = 2000L; // 2 seconds private static final float PENALTY_GROWTH_RATE = 1.3f; - private boolean[] finishedMaps; + private final BitSet finishedMaps; private final int numInputs; private final String srcNameTrimmed; private int numFetchedSpills; - private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>(); + private Map<HostPortPartition, MapHost> mapLocations = new HashMap<HostPortPartition, MapHost>(); @VisibleForTesting - final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap - = new ConcurrentHashMap<String, InputAttemptIdentifier>(); + final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>(); //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @@ -92,9 +138,9 @@ class ShuffleScheduler { private final Referee referee; @VisibleForTesting final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>(); - final Set<String> uniqueHosts = Sets.newHashSet(); - private final Map<String,IntWritable> hostFailures = - new HashMap<String,IntWritable>(); + final Set<HostPort> uniqueHosts = Sets.newHashSet(); + private final Map<HostPort,IntWritable> hostFailures = + new HashMap<HostPort,IntWritable>(); private final InputContext inputContext; private final Shuffle shuffle; private final TezCounter shuffledInputsCounter; @@ -157,7 +203,7 @@ class ShuffleScheduler { abortFailureLimit = abortFailureLimitConf; } remainingMaps = new AtomicInteger(numberOfInputs); - finishedMaps = new boolean[remainingMaps.get()]; // default init to false + finishedMaps = new BitSet(remainingMaps.get()); // default init to false this.minFailurePerHost = conf.getInt( TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, @@ -322,7 +368,7 @@ class ShuffleScheduler { failureCounts.remove(srcAttemptIdentifier); if (host != null) { - hostFailures.remove(host.getHostIdentifier()); + hostFailures.remove(new HostPort(host.getHost(), host.getPort())); } output.commit(); @@ -536,7 +582,7 @@ class ShuffleScheduler { private void penalizeHost(MapHost host, int failures) { host.penalize(); - String hostPort = host.getHostIdentifier(); + HostPort hostPort = new HostPort(host.getHost(), host.getPort()); // TODO TEZ-922 hostFailures isn't really used for anything apart from // hasFailedAcrossNodes().Factor it into error // reporting / potential blacklisting of hosts. @@ -611,7 +657,7 @@ class ShuffleScheduler { (int) Math.ceil(numUniqueHosts * hostFailureFraction)); int total = 0; boolean failedAcrossNodes = false; - for(String host : uniqueHosts) { + for(HostPort host : uniqueHosts) { IntWritable failures = hostFailures.get(host); if (failures != null && failures.get() > minFailurePerHost) { total++; @@ -771,17 +817,13 @@ class ShuffleScheduler { public synchronized void addKnownMapOutput(String inputHostName, int port, int partitionId, - String hostUrl, InputAttemptIdentifier srcAttempt) { - String hostPort = (inputHostName + ":" + String.valueOf(port)); - uniqueHosts.add(hostPort); - String identifier = MapHost.createIdentifier(hostPort, partitionId); - + uniqueHosts.add(new HostPort(inputHostName, port)); + HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId); MapHost host = mapLocations.get(identifier); if (host == null) { - host = new MapHost(partitionId, hostPort, hostUrl); - assert identifier.equals(host.getIdentifier()); + host = new MapHost(inputHostName, port, partitionId); mapLocations.put(identifier, host); } @@ -1006,8 +1048,8 @@ class ShuffleScheduler { } - private String getIdentifierFromPathAndReduceId(String path, int reduceId) { - return path + "_" + reduceId; + private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) { + return new PathPartition(path, reduceId); } /** @@ -1050,13 +1092,13 @@ class ShuffleScheduler { void setInputFinished(int inputIndex) { synchronized(finishedMaps) { - finishedMaps[inputIndex] = true; + finishedMaps.set(inputIndex); } } boolean isInputFinished(int inputIndex) { synchronized (finishedMaps) { - return finishedMaps[inputIndex]; + return finishedMaps.get(inputIndex); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 8affa66..531c18b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -45,6 +45,7 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -86,10 +87,11 @@ public class TestFetcher { InputContext inputContext = mock(InputContext.class); doReturn(new TezCounters()).when(inputContext).getCounters(); doReturn("src vertex").when(inputContext).getSourceVertexName(); + when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); final boolean ENABLE_LOCAL_FETCH = true; final boolean DISABLE_LOCAL_FETCH = false; - MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT); @@ -106,7 +108,7 @@ public class TestFetcher { // if hostname does not match use http spyFetcher = spy(fetcher); - mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl"); + mapHost = new MapHost(HOST + "_OTHER", PORT, 0); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); doReturn(mapHost).when(scheduler).getHost(); @@ -117,7 +119,7 @@ public class TestFetcher { // if port does not match use http spyFetcher = spy(fetcher); - mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl"); + mapHost = new MapHost(HOST, PORT + 1, 0); doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost); doReturn(mapHost).when(scheduler).getHost(); @@ -127,7 +129,7 @@ public class TestFetcher { verify(spyFetcher, times(1)).copyFromHost(mapHost); //if local fetch is not enabled - mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl"); + mapHost = new MapHost(HOST, PORT, 0); fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT); spyFetcher = spy(fetcher); @@ -150,14 +152,14 @@ public class TestFetcher { InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); + when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, inputContext, conf, true, HOST, PORT); FetcherOrderedGrouped spyFetcher = spy(fetcher); - MapHost host = new MapHost(1, HOST + ":" + PORT, - "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map="); + MapHost host = new MapHost(HOST, PORT, 1); List<InputAttemptIdentifier> srcAttempts = Arrays.asList( new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"), new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"), @@ -291,6 +293,7 @@ public class TestFetcher { InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); + when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); HttpConnection.HttpConnectionParams httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams(conf); @@ -299,8 +302,7 @@ public class TestFetcher { false, 0, null, inputContext, conf, false, HOST, PORT); final FetcherOrderedGrouped fetcher = spy(mockFetcher); - final MapHost host = new MapHost(1, HOST + ":" + PORT, - "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map="); + final MapHost host = new MapHost(HOST, PORT, 1); final List<InputAttemptIdentifier> srcAttempts = Arrays.asList( new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"), new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"), @@ -309,7 +311,7 @@ public class TestFetcher { doReturn(srcAttempts).when(scheduler).getMapsForHost(host); doReturn(true).when(fetcher).setupConnection(host, srcAttempts); - URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false); + URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=", srcAttempts, false); fetcher.httpConnection = new FakeHttpConnection(url, null, "", null); doAnswer(new Answer<MapOutput>() { http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 90fbf2f..0d268aa 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -157,7 +157,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { System.currentTimeMillis(), "srcNameTrimmed"); scheduler = spy(realScheduler); - handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false); + handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler); mergeManager = mock(MergeManager.class); } @@ -171,9 +171,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); handler.handleEvents(Collections.singletonList(dme1)); - String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); int partitionId = attemptNum; - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1)); verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); //Send final_update event. @@ -182,10 +181,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); handler.handleEvents(Collections.singletonList(dme2)); - baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); partitionId = attemptNum; assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id2)); assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); MapHost host = scheduler.getHost(); @@ -226,7 +224,6 @@ public class TestShuffleInputEventHandlerOrderedGrouped { //Process attempt #1 first int attemptNum = 1; int inputIdx = 1; - String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); handler.handleEvents(Collections.singletonList(dme1)); @@ -235,7 +232,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); - verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); + verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1)); assertTrue("Shuffle info events should not be empty for pipelined shuffle", !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); @@ -261,10 +258,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0, PATH_COMPONENT); - String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); int partitionId = srcIdx; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), - eq(baseUri), eq(expectedIdentifier)); + eq(expectedIdentifier)); assertTrue("Shuffle info events should be empty for regular shuffle codepath", scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); } @@ -317,12 +313,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped { false); events.add(dme); handler.handleEvents(events); - String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString(); int partitionId = srcIdx; InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT); - verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), - eq(expectedIdentifier)); + verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier)); } private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException {
