Repository: tez Updated Branches: refs/heads/master 2f1738888 -> a83b1e9d4
TEZ-3990. The number of shuffle penalties for a host/inputAttemptIdentifier should be capped (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a83b1e9d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a83b1e9d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a83b1e9d Branch: refs/heads/master Commit: a83b1e9d467f5a74ed72da76b1de6e725cc33ebe Parents: 2f17388 Author: Jonathan Eagles <[email protected]> Authored: Wed Oct 10 16:21:21 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Wed Oct 10 16:21:21 2018 -0500 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 10 ++++ .../orderedgrouped/ShuffleScheduler.java | 16 +++++-- .../orderedgrouped/TestShuffleScheduler.java | 50 ++++++++++++++++++++ 3 files changed, 72 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 23f1f9b..85c53a5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -238,6 +238,15 @@ public class TezRuntimeConfiguration { "shuffle.fetch.failures.limit"; public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5; + /** + * Specifies in milliseconds the maximum delay a penalized host can have before being retried, + * defaults to 10 minutes. + */ + @ConfigurationProperty(type = "integer") + public static final String TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS = TEZ_RUNTIME_PREFIX + + "shuffle.host.penalty.time.limit"; + public static final int TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT = 600000; + @Private @Unstable @ConfigurationProperty(type = "integer") @@ -609,6 +618,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS); defaultConf.addResource("core-default.xml"); defaultConf.addResource("core-site.xml"); http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 981e224..d847932 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -60,7 +60,6 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.TezConstants; import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -245,6 +244,7 @@ class ShuffleScheduler { private final boolean compositeFetch; private volatile Thread shuffleSchedulerThread = null; + private final int maxPenaltyTime; private long totalBytesShuffledTillNow = 0; private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); @@ -417,6 +417,8 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); + this.maxPenaltyTime = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS_DEFAULT); pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap(); LOG.info("ShuffleScheduler running for sourceVertex: " @@ -831,7 +833,8 @@ class ShuffleScheduler { long delay = (long) (INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, failures)); - penalties.add(new Penalty(host, delay)); + long penaltyDelay = Math.min(delay, maxPenaltyTime); + penalties.add(new Penalty(host, penaltyDelay)); } private int getFailureCount(InputAttemptIdentifier srcAttempt) { @@ -1149,7 +1152,12 @@ class ShuffleScheduler { String path, int reduceId) { return pathToIdentifierMap.get(new PathPartition(path, reduceId)); } - + + @VisibleForTesting + DelayQueue<Penalty> getPenalties() { + return penalties; + } + private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) { boolean isInputFinished = false; if (id instanceof CompositeInputAttemptIdentifier) { @@ -1281,7 +1289,7 @@ class ShuffleScheduler { /** * A structure that records the penalty for a host. */ - private static class Penalty implements Delayed { + static class Penalty implements Delayed { MapHost host; private long endTime; http://git-wip-us.apache.org/repos/asf/tez/blob/a83b1e9d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index 381ad85..7a7b1ee 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -34,6 +34,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -55,6 +56,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -953,6 +955,54 @@ public class TestShuffleScheduler { } } + @Test (timeout = 120000) + public void testPenalties() throws Exception { + InputContext inputContext = createTezInputContext(); + Configuration conf = new TezConfiguration(); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS, 20000); + int numInputs = 10; + Shuffle shuffle = mock(Shuffle.class); + MergeManager mergeManager = mock(MergeManager.class); + + final ShuffleSchedulerForTest scheduler = + new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager, + mergeManager, + System.currentTimeMillis(), null, false, 0, "srcName"); + + ExecutorService executor = Executors.newFixedThreadPool(1); + + Future<Void> executorFuture = executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + scheduler.start(); + return null; + } + }); + + InputAttemptIdentifier[] identifiers = new InputAttemptIdentifier[numInputs]; + + for (int i = 0; i < numInputs; i++) { + CompositeInputAttemptIdentifier inputAttemptIdentifier = + new CompositeInputAttemptIdentifier(i, 0, "attempt_", 1); + scheduler.addKnownMapOutput("host" + i, 10000, 1, inputAttemptIdentifier); + identifiers[i] = inputAttemptIdentifier; + } + + MapHost[] mapHosts = new MapHost[numInputs]; + int count = 0; + for (MapHost mh : scheduler.mapLocations.values()) { + mapHosts[count++] = mh; + } + + for (int i = 0; i < 10; i++) { + scheduler.copyFailed(identifiers[0], mapHosts[0], false, false, false); + } + ShuffleScheduler.Penalty[] penaltyArray = new ShuffleScheduler.Penalty[scheduler.getPenalties().size()]; + scheduler.getPenalties().toArray(penaltyArray); + for (int i = 0; i < penaltyArray.length; i++) { + Assert.assertTrue(penaltyArray[i].getDelay(TimeUnit.MILLISECONDS) <= 20000); + } + } private InputContext createTezInputContext() throws IOException { ApplicationId applicationId = ApplicationId.newInstance(1, 1);
