Repository: tez Updated Branches: refs/heads/master 4389ce8cf -> f8e014876
TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput. Contributed by Ming Ma. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f8e01487 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f8e01487 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f8e01487 Branch: refs/heads/master Commit: f8e014876dc57fbc27ed0f8280e5d5ba01a7c1e4 Parents: 4389ce8 Author: Siddharth Seth <[email protected]> Authored: Thu Apr 28 09:05:03 2016 +0530 Committer: Siddharth Seth <[email protected]> Committed: Thu Apr 28 09:05:03 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../runtime/library/common/shuffle/Fetcher.java | 13 +- .../library/common/shuffle/HostPort.java | 77 ++++++ .../library/common/shuffle/InputHost.java | 144 +++++----- .../common/shuffle/impl/ShuffleManager.java | 76 +++--- .../common/shuffle/orderedgrouped/MapHost.java | 44 ---- .../orderedgrouped/ShuffleScheduler.java | 2 +- .../impl/TestShuffleInputEventHandlerImpl.java | 3 +- .../common/shuffle/impl/TestShuffleManager.java | 261 +++++++++++++++++++ 9 files changed, 464 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a1fa718..324ca38 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.9.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput. TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests. TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. TEZ-3224. User payload is not initialized before creating vertex manager plugin. http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 261f2e7..d445587 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -102,8 +102,19 @@ public class Fetcher extends CallableWithNdc<FetchResult> { // Parameters to track work. private List<InputAttemptIdentifier> srcAttempts; @VisibleForTesting + public List<InputAttemptIdentifier> getSrcAttempts() { + return srcAttempts; + } + + @VisibleForTesting Map<String, InputAttemptIdentifier> srcAttemptsRemaining; + private String host; + @VisibleForTesting + public String getHost() { + return host; + } + private int port; private int partition; @@ -182,7 +193,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { } @Override - protected FetchResult callInternal() throws Exception { + public FetchResult callInternal() throws Exception { boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled); if (srcAttempts.size() == 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java new file mode 100644 index 0000000..cac9d9a --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.shuffle; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +@Private +public class HostPort { + + private final String host; + private final int port; + + public HostPort(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((host == null) ? 0 : host.hashCode()); + result = prime * result + port; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HostPort other = (HostPort) obj; + if (host == null) { + if (other.host != null) + return false; + } else if (!host.equals(other.host)) + return false; + if (port != other.port) + return false; + return true; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getIdentifier() { + return host + ":" + port; + } + + @Override + public String toString() { + return host + ":" + port; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java index b3382ea..a447d83 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java @@ -19,50 +19,30 @@ package org.apache.tez.runtime.library.common.shuffle; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; /** * Represents a Host with respect to the MapReduce ShuffleHandler. * - * srcPhysicalIndex / partition is part of this since that only knows how to - * serve ine partition at a time. */ -public class InputHost { +public class InputHost extends HostPort { - private final String host; - private final int port; - private final int srcPhysicalIndex; - private final String identifier; private String additionalInfo; - private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>(); + // Each input host can support more than one partition. + // Each partition has a list of inputs for pipelined shuffle. + private final Map<Integer, BlockingQueue<InputAttemptIdentifier>> + partitionToInputs = new ConcurrentHashMap<>(); - public static String createIdentifier(String host, int port) { - return (host + ":" + String.valueOf(port)); - } - - public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex) { - this.host = hostName; - this.port = port; - this.srcPhysicalIndex = srcPhysicalIndex; - this.identifier = createIdentifier(hostName, port); - } - - public String getHost() { - return this.host; - } - - public int getPort() { - return this.port; - } - - public String getIdentifier() { - return this.identifier; + public InputHost(HostPort hostPort) { + super(hostPort.getHost(), hostPort.getPort()); } public void setAdditionalInfo(String additionalInfo) { @@ -73,70 +53,76 @@ public class InputHost { return (additionalInfo == null) ? "" : additionalInfo; } - public int getSrcPhysicalIndex() { - return this.srcPhysicalIndex; + public int getNumPendingPartitions() { + return partitionToInputs.size(); } - public int getNumPendingInputs() { - return inputs.size(); - } - - public void addKnownInput(InputAttemptIdentifier srcAttempt) { + public void addKnownInput(Integer partition, + InputAttemptIdentifier srcAttempt) { + BlockingQueue<InputAttemptIdentifier> inputs = + partitionToInputs.get(partition); + if (inputs == null) { + inputs = new LinkedBlockingQueue<InputAttemptIdentifier>(); + partitionToInputs.put(partition, inputs); + } inputs.add(srcAttempt); } - public List<InputAttemptIdentifier> clearAndGetPendingInputs() { - List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>( - inputs.size()); - inputs.drainTo(inputsCopy); - return inputsCopy; + public PartitionToInputs clearAndGetOnePartition() { + for (Map.Entry<Integer, BlockingQueue<InputAttemptIdentifier>> entry : + partitionToInputs.entrySet()) { + List<InputAttemptIdentifier> inputs = + new ArrayList<InputAttemptIdentifier>(entry.getValue().size()); + entry.getValue().drainTo(inputs); + PartitionToInputs ret = new PartitionToInputs(entry.getKey(), inputs); + partitionToInputs.remove(entry.getKey()); + return ret; + } + return null; + } + + public String toDetailedString() { + return "HostPort=" + super.toString() + ", InputDetails=" + + partitionToInputs; + } + + @Override + public String toString() { + return "HostPort=" + super.toString() + ", PartitionIds=" + + partitionToInputs.keySet(); } @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((host == null) ? 0 : host.hashCode()); - result = prime * result + port; - result = prime * result + srcPhysicalIndex; - return result; + return super.hashCode(); } @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; + public boolean equals(Object to) { + return super.equals(to); + } + + public static class PartitionToInputs { + private int partition; + private List<InputAttemptIdentifier> inputs; + + public PartitionToInputs(int partition, + List<InputAttemptIdentifier> input) { + this.partition = partition; + this.inputs = input; } - InputHost other = (InputHost) obj; - if (host == null) { - if (other.host != null) { - return false; - } - } else if (!host.equals(other.host)) - return false; - if (port != other.port) { - return false; + + public int getPartition() { + return partition; } - if (srcPhysicalIndex != other.srcPhysicalIndex) { - return false; + + public List<InputAttemptIdentifier> getInputs() { + return inputs; } - return true; - } - public String toDetailedString() { - return "InputHost [host=" + host + ", port=" + port + ",srcPhysicalIndex=" + srcPhysicalIndex - + ", inputs=" + inputs + "]"; - } - - @Override - public String toString() { - return "InputHost [host=" + host + ", port=" + port + ", srcPhysicalIndex=" + srcPhysicalIndex - + "]"; + @Override + public String toString() { + return "partition=" + partition + ", inputs=" + inputs; + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index b82098e..7ca9a1f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -71,13 +71,15 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.FetchResult; import org.apache.tez.runtime.library.common.shuffle.FetchedInput; +import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type; import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; import org.apache.tez.runtime.library.common.shuffle.Fetcher; +import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder; import org.apache.tez.runtime.library.common.shuffle.FetcherCallback; +import org.apache.tez.runtime.library.common.shuffle.HostPort; import org.apache.tez.runtime.library.common.shuffle.InputHost; +import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; -import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type; -import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -104,15 +106,17 @@ public class ShuffleManager implements FetcherCallback { private final FetchedInputAllocator inputManager; - private final ListeningExecutorService fetcherExecutor; + @VisibleForTesting + final ListeningExecutorService fetcherExecutor; private final ListeningExecutorService schedulerExecutor; private final RunShuffleCallable schedulerCallable; private final BlockingQueue<FetchedInput> completedInputs; private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false); - private final Set<Integer> completedInputSet; - private final ConcurrentMap<String, InputHost> knownSrcHosts; + @VisibleForTesting + final Set<Integer> completedInputSet; + private final ConcurrentMap<HostPort, InputHost> knownSrcHosts; private final BlockingQueue<InputHost> pendingHosts; private final Set<InputAttemptIdentifier> obsoletedInputs; private Set<Fetcher> runningFetchers; @@ -211,7 +215,7 @@ public class ShuffleManager implements FetcherCallback { * We do not know upfront the number of spills from source. */ completedInputs = new LinkedBlockingDeque<FetchedInput>(); - knownSrcHosts = new ConcurrentHashMap<String, InputHost>(); + knownSrcHosts = new ConcurrentHashMap<HostPort, InputHost>(); pendingHosts = new LinkedBlockingQueue<InputHost>(); obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>()); runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>()); @@ -336,13 +340,15 @@ public class ShuffleManager implements FetcherCallback { } } if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString()); + LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + + inputHost.toDetailedString()); } - if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) { + if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get()) { Fetcher fetcher = constructFetcherForHost(inputHost, conf); runningFetchers.add(fetcher); if (isShutdown.get()) { - LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown, Breaking out of ShuffleScheduler Loop"); + LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown," + + "Breaking out of ShuffleScheduler Loop"); break; } ListenableFuture<FetchResult> future = fetcherExecutor @@ -353,8 +359,9 @@ public class ShuffleManager implements FetcherCallback { } } else { if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier() - + " since it has no inputs to process"); + LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + + inputHost.getIdentifier() + + " since it has no inputs to process"); } } } @@ -389,8 +396,9 @@ public class ShuffleManager implements FetcherCallback { } return true; } - - private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { + + @VisibleForTesting + Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { Path lockDisk = null; @@ -413,11 +421,12 @@ public class ShuffleManager implements FetcherCallback { // Remove obsolete inputs from the list being given to the fetcher. Also // remove from the obsolete list. - List<InputAttemptIdentifier> pendingInputsForHost = inputHost - .clearAndGetPendingInputs(); + PartitionToInputs pendingInputsOfOnePartition = inputHost + .clearAndGetOnePartition(); int includedMaps = 0; - for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost - .iterator(); inputIter.hasNext();) { + for (Iterator<InputAttemptIdentifier> inputIter = + pendingInputsOfOnePartition.getInputs().iterator(); + inputIter.hasNext();) { InputAttemptIdentifier input = inputIter.next(); //For pipelined shuffle. @@ -439,20 +448,23 @@ public class ShuffleManager implements FetcherCallback { // Check if max threshold is met if (includedMaps >= maxTaskOutputAtOnce) { inputIter.remove(); - inputHost.addKnownInput(input); //add to inputHost + //add to inputHost + inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), + input); } else { includedMaps++; } } - if (inputHost.getNumPendingInputs() > 0) { + if (inputHost.getNumPendingPartitions() > 0) { pendingHosts.add(inputHost); //add it to queue } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), - inputHost.getSrcPhysicalIndex(), pendingInputsForHost); + pendingInputsOfOnePartition.getPartition(), + pendingInputsOfOnePartition.getInputs()); if (LOG.isDebugEnabled()) { LOG.debug("Created Fetcher for host: " + inputHost.getHost() + ", info: " + inputHost.getAdditionalInfo() - + ", with inputs: " + pendingInputsForHost); + + ", with inputs: " + pendingInputsOfOnePartition); } return fetcherBuilder.build(); } @@ -461,18 +473,18 @@ public class ShuffleManager implements FetcherCallback { public void addKnownInput(String hostName, int port, InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) { - String identifier = InputHost.createIdentifier(hostName, port); + HostPort identifier = new HostPort(hostName, port); InputHost host = knownSrcHosts.get(identifier); if (host == null) { - host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex); - assert identifier.equals(host.getIdentifier()); + host = new InputHost(identifier); InputHost old = knownSrcHosts.putIfAbsent(identifier, host); if (old != null) { host = old; } } if (LOG.isDebugEnabled()) { - LOG.debug(srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host: " + host); + LOG.debug(srcNameTrimmed + ": " + "Adding input: " + + srcAttemptIdentifier + ", to host: " + host); } if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) { @@ -484,12 +496,13 @@ public class ShuffleManager implements FetcherCallback { shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier)); } - host.addKnownInput(srcAttemptIdentifier); + host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier); lock.lock(); try { boolean added = pendingHosts.offer(host); if (!added) { - String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue"; + String errorMessage = "Unable to add host: " + + host.getIdentifier() + " to pending queue"; LOG.error(errorMessage); throw new TezUncheckedException(errorMessage); } @@ -865,7 +878,8 @@ public class ShuffleManager implements FetcherCallback { * Fake input that is added to the completed input list in case an input does not have any data. * */ - private static class NullFetchedInput extends FetchedInput { + @VisibleForTesting + static class NullFetchedInput extends FetchedInput { public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) { super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null); @@ -966,10 +980,12 @@ public class ShuffleManager implements FetcherCallback { } else { Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs(); if (pendingInputs != null && pendingInputs.iterator().hasNext()) { - InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort())); + HostPort identifier = new HostPort(result.getHost(), + result.getPort()); + InputHost inputHost = knownSrcHosts.get(identifier); assert inputHost != null; for (InputAttemptIdentifier input : pendingInputs) { - inputHost.addKnownInput(input); + inputHost.addKnownInput(result.getPartition(), input); } inputHost.setAdditionalInfo(result.getAdditionalInfo()); pendingHosts.add(inputHost); http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java index 486d8c5..c2cfd06 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java @@ -33,50 +33,6 @@ class MapHost { PENALIZED // Host penalized due to shuffle failures } - public static class HostPort { - - final String host; - final int port; - - HostPort(String host, int port) { - this.host = host; - this.port = port; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((host == null) ? 0 : host.hashCode()); - result = prime * result + port; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HostPort other = (HostPort) obj; - if (host == null) { - if (other.host != null) - return false; - } else if (!host.equals(other.host)) - return false; - if (port != other.port) - return false; - return true; - } - - @Override - public String toString() { - return "HostPort [host=" + host + ", port=" + port + "]"; - } - } - public static class HostPortPartition { final String host; http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 2f6e490..0a2b730 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -73,7 +73,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; -import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort; +import org.apache.tez.runtime.library.common.shuffle.HostPort; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index 0294bd3..6bcbeb6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@ -18,7 +18,6 @@ package org.apache.tez.runtime.library.common.shuffle.impl; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -334,7 +333,7 @@ public class TestShuffleInputEventHandlerImpl { DataMovementEventPayloadProto.Builder builder = DataMovementEventPayloadProto.newBuilder(); builder.setHost(HOST); builder.setPort(PORT); - builder.setPathComponent("attempttmp"); + builder.setPathComponent(PATH_COMPONENT); if (emptyPartitionByteString != null) { builder.setEmptyPartitions(emptyPartitionByteString); } http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java new file mode 100644 index 0000000..a5608ef --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.shuffle.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.ExecutionContext; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.FetchedInput; +import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator; +import org.apache.tez.runtime.library.common.shuffle.Fetcher; +import org.apache.tez.runtime.library.common.shuffle.FetchResult; +import org.apache.tez.runtime.library.common.shuffle.InputHost; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + + +public class TestShuffleManager { + + private static final String FETCHER_HOST = "localhost"; + private static final int PORT = 8080; + private static final String PATH_COMPONENT = "attempttmp"; + private final Configuration conf = new Configuration(); + + /** + * One reducer fetches multiple partitions from each mapper. + * For a given mapper, the reducer sends DataMovementEvents for several + * partitions, wait for some time and then send DataMovementEvents for the + * rest of the partitions. Then do the same thing for the next mapper. + * Verify ShuffleManager is able to get all the events. + */ + @Test(timeout = 50000) + public void testMultiplePartitions() throws Exception { + final int numOfMappers = 3; + final int numOfPartitions = 5; + final int firstPart = 2; + InputContext inputContext = createInputContext(); + ShuffleManagerForTest shuffleManager = createShuffleManager(inputContext, + numOfMappers * numOfPartitions); + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + + ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl( + inputContext, shuffleManager, inputAllocator, null, false, 0); + shuffleManager.run(); + + List<Event> eventList = new LinkedList<Event>(); + + int targetIndex = 0; // The physical input index within the reduce task + + for (int i = 0; i < numOfMappers; i++) { + String mapperHost = "host" + i; + int srcIndex = 20; // The physical output index within the map task + // Send the first batch of DataMovementEvents + eventList.clear(); + for (int j = 0; j < firstPart; j++) { + Event dme = createDataMovementEvent(mapperHost, srcIndex++, + targetIndex++); + eventList.add(dme); + } + handler.handleEvents(eventList); + + Thread.sleep(500); + + + // Send the second batch of DataMovementEvents + eventList.clear(); + for (int j = 0; j < numOfPartitions - firstPart; j++) { + Event dme = createDataMovementEvent(mapperHost, srcIndex++, + targetIndex++); + eventList.add(dme); + } + handler.handleEvents(eventList); + } + + int waitCount = 100; + while (waitCount-- > 0 && + !(shuffleManager.isFetcherExecutorShutdown() && + numOfMappers * numOfPartitions == + shuffleManager.getNumOfCompletedInputs())) { + Thread.sleep(100); + } + assertTrue(shuffleManager.isFetcherExecutorShutdown()); + assertEquals(numOfMappers * numOfPartitions, + shuffleManager.getNumOfCompletedInputs()); + } + + private InputContext createInputContext() throws IOException { + DataOutputBuffer port_dob = new DataOutputBuffer(); + port_dob.writeInt(PORT); + final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, + port_dob.getLength()); + + ExecutionContext executionContext = mock(ExecutionContext.class); + doReturn(FETCHER_HOST).when(executionContext).getHostName(); + + InputContext inputContext = mock(InputContext.class); + doReturn(new TezCounters()).when(inputContext).getCounters(); + doReturn("sourceVertex").when(inputContext).getSourceVertexName(); + doReturn(shuffleMetaData).when(inputContext) + .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); + doReturn(executionContext).when(inputContext).getExecutionContext(); + return inputContext; + } + + @SuppressWarnings("unchecked") + private ShuffleManagerForTest createShuffleManager( + InputContext inputContext, int expectedNumOfPhysicalInputs) + throws IOException { + Path outDirBase = new Path(".", "outDir"); + String[] outDirs = new String[] { outDirBase.toString() }; + doReturn(outDirs).when(inputContext).getWorkDirs(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, + inputContext.getWorkDirs()); + + DataOutputBuffer out = new DataOutputBuffer(); + Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(), + new JobTokenSecretManager(null)); + token.write(out); + doReturn(ByteBuffer.wrap(out.getData())).when(inputContext). + getServiceConsumerMetaData( + TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID); + + FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class); + return new ShuffleManagerForTest(inputContext, conf, + expectedNumOfPhysicalInputs, 1024, false, -1, null, inputAllocator); + } + + private Event createDataMovementEvent(String host, int srcIndex, int targetIndex) { + DataMovementEventPayloadProto.Builder builder = + DataMovementEventPayloadProto.newBuilder(); + builder.setHost(host); + builder.setPort(PORT); + builder.setPathComponent(PATH_COMPONENT); + Event dme = DataMovementEvent + .create(srcIndex, targetIndex, 0, + builder.build().toByteString().asReadOnlyByteBuffer()); + return dme; + } + + private static class ShuffleManagerForTest extends ShuffleManager { + public ShuffleManagerForTest(InputContext inputContext, Configuration conf, + int numInputs, int bufferSize, boolean ifileReadAheadEnabled, + int ifileReadAheadLength, CompressionCodec codec, + FetchedInputAllocator inputAllocator) throws IOException { + super(inputContext, conf, numInputs, bufferSize, ifileReadAheadEnabled, + ifileReadAheadLength, codec, inputAllocator); + } + + @Override + Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) { + final Fetcher fetcher = spy(super.constructFetcherForHost(inputHost, + conf)); + final FetchResult mockFetcherResult = mock(FetchResult.class); + try { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + for(InputAttemptIdentifier input : fetcher.getSrcAttempts()) { + ShuffleManagerForTest.this.fetchSucceeded( + fetcher.getHost(), input, new TestFetchedInput(input), 0, 0, + 0); + } + return mockFetcherResult; + } + }).when(fetcher).callInternal(); + } catch (Exception e) { + //ignore + } + return fetcher; + } + + public int getNumOfCompletedInputs() { + return completedInputSet.size(); + } + + boolean isFetcherExecutorShutdown() { + return fetcherExecutor.isShutdown(); + } + } + + /** + * Fake input that is added to the completed input list in case an input does not have any data. + * + */ + @VisibleForTesting + static class TestFetchedInput extends FetchedInput { + + public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) { + super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return null; + } + + @Override + public InputStream getInputStream() throws IOException { + return null; + } + + @Override + public void commit() throws IOException { + } + + @Override + public void abort() throws IOException { + } + + @Override + public void free() { + } + } +}
