Repository: tez Updated Branches: refs/heads/master a9cfeb914 -> 46f2454ac
TEZ-2882. Consider improving fetch failure handling (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/46f2454a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/46f2454a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/46f2454a Branch: refs/heads/master Commit: 46f2454acfb22eebd7937280854b2a8d6b5003b9 Parents: a9cfeb9 Author: Rajesh Balamohan <[email protected]> Authored: Wed Oct 21 13:34:14 2015 -0700 Committer: Rajesh Balamohan <[email protected]> Committed: Wed Oct 21 13:34:14 2015 -0700 ---------------------------------------------------------------------- .../library/api/TezRuntimeConfiguration.java | 108 ++++ .../orderedgrouped/FetcherOrderedGrouped.java | 10 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 9 +- .../orderedgrouped/ShuffleScheduler.java | 462 ++++++++++---- .../library/input/OrderedGroupedKVInput.java | 14 + .../shuffle/orderedgrouped/TestFetcher.java | 10 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 22 +- .../orderedgrouped/TestShuffleScheduler.java | 623 ++++++++++++++++++- .../TestOrderedGroupedMergedKVInputConfig.java | 30 + 9 files changed, 1149 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index a84448f..cf05546 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -198,6 +198,106 @@ public class TezRuntimeConfiguration { "shuffle.fetch.failures.limit"; public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5; + @Private + @Unstable + @ConfigurationProperty(type = "integer") + /** + * Expert setting made available only for debugging. Do not change it. Sets + * the number of retries before giving up on downloading from source + * attempt by consumer. Code internally handles the threshold if set to -1. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT = + TEZ_RUNTIME_PREFIX + "shuffle.src-attempt.abort.limit"; + public static final int + TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT = -1; + + @Private + @Unstable + @ConfigurationProperty(type = "float") + /** + * Expert setting made available only for debugging. Do not change it. Setting + * to determine if failures happened across a percentage of nodes. This + * helps in determining if the consumer has to be restarted on continuous + * failures. Setting it to lower value can make consumer restarts more + * aggressive on failures. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.acceptable.host-fetch.failure.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT = 0.2f; + + @Private + @Unstable + @ConfigurationProperty(type = "integer") + /** + * Expert setting made available only for debugging. Do not change it. Setting + * to determine if the consumer has to be restarted on continuous + * failures across nodes. Used along with {@link + * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION}. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST = + TEZ_RUNTIME_PREFIX + "shuffle.min.failures.per.host"; + public static final int TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT = 4; + + @Private + @Unstable + @ConfigurationProperty(type = "float") + /** + * Expert setting made available only for debugging. Do not change it. + * Maximum percentage of time (compared to overall progress), the fetcher is + * allowed before concluding that it is stalled. + */ + public static final String TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.max.stall.time.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + @ConfigurationProperty(type = "float") + /** + * Expert setting made available only for debugging. Do not change it. + * Fraction to determine whether the shuffle has progressed enough or not + * If it has not progressed enough, it could be qualified for the consumer. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.min.required.progress.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + @ConfigurationProperty(type = "float") + /** + * Expert setting made available only for debugging. Do not change it. + * Provides threshold for determining whether fetching has to be marked + * unhealthy based on the ratio of (failures/(failures+completed)) + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.max.allowed.failed.fetch.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + @ConfigurationProperty(type = "boolean") + /** + * Expert setting made available only for debugging. Do not change it. + * Provides threshold for determining whether fetching has to be marked + * unhealthy based on the ratio of (failures/(failures+completed)) + */ + public static final String + TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION = + TEZ_RUNTIME_PREFIX + "shuffle.failed.check.since-last.completion"; + public static final boolean + TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT = true; + + @ConfigurationProperty(type = "integer") public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE = TEZ_RUNTIME_PREFIX + @@ -418,6 +518,14 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM); + tezRuntimeKeys.add + (TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS); tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 1b4031d..93f083d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -297,7 +297,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } else { LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks)); for (InputAttemptIdentifier left : failedTasks) { - scheduler.copyFailed(left, host, true, false); + scheduler.copyFailed(left, host, true, false, false); } } } @@ -359,7 +359,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { for (InputAttemptIdentifier left : remaining.values()) { // Need to be handling temporary glitches .. // Report read error to the AM to trigger source failure heuristics - scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded); + scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false); } return false; } @@ -505,7 +505,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { retryStartTime = 0; scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, - endTime - startTime, mapOutput); + endTime - startTime, mapOutput, false); // Note successful shuffle remaining.remove(srcAttemptId.toString()); metrics.successFetch(); @@ -667,7 +667,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); long endTime = System.currentTimeMillis(); scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), - indexRecord.getRawLength(), (endTime - startTime), mapOutput); + indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); iter.remove(); metrics.successFetch(); } catch (IOException e) { @@ -677,7 +677,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { if (!stopped) { metrics.failedFetch(); ioErrs.increment(1); - scheduler.copyFailed(srcAttemptId, host, true, false); + scheduler.copyFailed(srcAttemptId, host, true, false, true); LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), e); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index e0473b3..f8c9553 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -50,7 +50,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; - private int maxMapRuntime = 0; private final boolean sslShuffle; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); @@ -110,12 +109,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); } - // TODO NEWTEZ See if this duration hack can be removed. - int duration = shufflePayload.getRunDuration(); - if (duration > maxMapRuntime) { - maxMapRuntime = duration; - scheduler.informMaxMapRunTime(maxMapRuntime); - } if (shufflePayload.hasEmptyPartitions()) { try { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); @@ -128,7 +121,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl + srcAttemptIdentifier + "]. Not fetching."); } numDmeEventsNoData.incrementAndGet(); - scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null); + scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true); return; } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index f45ca35..22da46c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -94,7 +95,7 @@ class ShuffleScheduler { private final AtomicLong shuffleStart = new AtomicLong(0); private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class); - private static final long INITIAL_PENALTY = 2000l; // 2 seconds + static final long INITIAL_PENALTY = 2000L; // 2 seconds private static final float PENALTY_GROWTH_RATE = 1.3f; private final BitSet finishedMaps; @@ -103,22 +104,26 @@ class ShuffleScheduler { @VisibleForTesting final Map<String, MapHost> mapLocations = new HashMap<String, MapHost>(); //TODO Clean this and other maps at some point - private final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); + @VisibleForTesting + final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<String, InputAttemptIdentifier>(); //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap; + final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; - private final Set<MapHost> pendingHosts = new HashSet<MapHost>(); + @VisibleForTesting + final Set<MapHost> pendingHosts = new HashSet<MapHost>(); private final Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>(); private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final Random random = new Random(System.currentTimeMillis()); private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>(); private final Referee referee; - private final Map<InputAttemptIdentifier, IntWritable> failureCounts = - new HashMap<InputAttemptIdentifier,IntWritable>(); + @VisibleForTesting + final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>(); + final Set<String> uniqueHosts = Sets.newHashSet(); private final Map<String,IntWritable> hostFailures = new HashMap<String,IntWritable>(); private final InputContext inputContext; @@ -126,7 +131,8 @@ class ShuffleScheduler { private final TezCounter skippedInputCounter; private final TezCounter reduceShuffleBytes; private final TezCounter reduceBytesDecompressed; - private final TezCounter failedShuffleCounter; + @VisibleForTesting + final TezCounter failedShuffleCounter; private final TezCounter bytesShuffledToDisk; private final TezCounter bytesShuffledToDiskDirect; private final TezCounter bytesShuffledToMem; @@ -134,9 +140,13 @@ class ShuffleScheduler { private final TezCounter lastEventReceived; private final String srcNameTrimmed; - private final AtomicInteger remainingMaps; + @VisibleForTesting + final AtomicInteger remainingMaps; private final long startTime; - private long lastProgressTime; + @VisibleForTesting + long lastProgressTime; + @VisibleForTesting + long failedShufflesSinceLastCompletion; private final int numFetchers; private final Set<FetcherOrderedGrouped> runningFetchers = @@ -171,12 +181,18 @@ class ShuffleScheduler { private final boolean reportReadErrorImmediately; private final int maxFailedUniqueFetches; private final int abortFailureLimit; - private int maxMapRuntime = 0; + + private final int minFailurePerHost; + private final float hostFailureFraction; + private final float maxStallTimeFraction; + private final float minReqProgressFraction; + private final float maxAllowedFailedFetchFraction; + private final boolean checkFailedFetchSinceLastCompletion; private volatile Thread shuffleSchedulerThread = null; private long totalBytesShuffledTillNow = 0; - private DecimalFormat mbpsFormat = new DecimalFormat("0.00"); + private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); public ShuffleScheduler(InputContext inputContext, Configuration conf, @@ -195,7 +211,15 @@ class ShuffleScheduler { this.allocator = allocator; this.mergeManager = mergeManager; this.numInputs = numberOfInputs; - abortFailureLimit = Math.max(30, numberOfInputs / 10); + int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT); + if (abortFailureLimitConf <= -1) { + abortFailureLimit = Math.max(15, numberOfInputs / 10); + } else { + //No upper cap, as user is setting this intentionally + abortFailureLimit = abortFailureLimitConf; + } remainingMaps = new AtomicInteger(numberOfInputs); finishedMaps = new BitSet(numberOfInputs); this.ifileReadAhead = ifileReadAhead; @@ -211,6 +235,47 @@ class ShuffleScheduler { localDiskFetchEnabled = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH_DEFAULT); + + this.minFailurePerHost = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT); + Preconditions.checkArgument(minFailurePerHost >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST + + "=" + minFailurePerHost + " should not be negative"); + + this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT); + + this.maxStallTimeFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT); + Preconditions.checkArgument(maxStallTimeFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION + + "=" + maxStallTimeFraction + " should not be negative"); + + this.minReqProgressFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT); + Preconditions.checkArgument(minReqProgressFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION + + "=" + minReqProgressFraction + " should not be negative"); + + this.maxAllowedFailedFetchFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT); + Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION + + "=" + maxAllowedFailedFetchFraction + " should not be negative"); + + this.checkFailedFetchSinceLastCompletion = conf.getBoolean + (TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); + this.localHostname = inputContext.getExecutionContext().getHostName(); final ByteBuffer shuffleMetadata = inputContext.getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); @@ -284,16 +349,22 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); - shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce - + ", numFetchers=" + numFetchers); + + ", numFetchers=" + numFetchers + + ", hostFailureFraction=" + hostFailureFraction + + ", minFailurePerHost=" + minFailurePerHost + + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction + + ", maxStallTimeFraction=" + maxStallTimeFraction + + ", minReqProgressFraction=" + minReqProgressFraction + + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion + ); } public void start() throws Exception { @@ -391,10 +462,17 @@ class ShuffleScheduler { long bytesCompressed, long bytesDecompressed, long millis, - MapOutput output + MapOutput output, + boolean isLocalFetch ) throws IOException { if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { + if (!isLocalFetch) { + /** + * Reset it only when it is a non-local-disk copy. + */ + failedShufflesSinceLastCompletion = 0; + } if (output != null) { failureCounts.remove(srcAttemptIdentifier); @@ -435,12 +513,12 @@ class ShuffleScheduler { return; } - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); + ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier); //Possible that Shuffle event handler invoked this, due to empty partitions if (eventInfo == null && output == null) { eventInfo = new ShuffleEventInfo(srcAttemptIdentifier); - shuffleInfoEventsMap.put(inputIdentifier, eventInfo); + pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo); } assert(eventInfo != null); @@ -455,10 +533,10 @@ class ShuffleScheduler { if (eventInfo.isDone()) { remainingMaps.decrementAndGet(); setInputFinished(inputIdentifier.getInputIndex()); - shuffleInfoEventsMap.remove(inputIdentifier); + pipelinedShuffleInfoEventsMap.remove(inputIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + - shuffleInfoEventsMap); + pipelinedShuffleInfoEventsMap); } } @@ -486,7 +564,8 @@ class ShuffleScheduler { } } else { // input is already finished. duplicate fetch. - LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier); + LOG.warn("Duplicate fetch of input no longer needs to be fetched: " + + srcAttemptIdentifier); // free the resource - specially memory // If the src does not generate data, output will be null. @@ -501,7 +580,7 @@ class ShuffleScheduler { //For pipelined shuffle. //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt if (input.canRetrieveInputInChunks()) { - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier()); if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { reportExceptionForInput(new IOException("Previous event already got scheduled for " + input + ". Previous attempt's data could have been already merged " @@ -512,7 +591,8 @@ class ShuffleScheduler { } if (eventInfo == null) { - shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input)); + pipelinedShuffleInfoEventsMap.put(input.getInputIdentifier(), + new ShuffleEventInfo(input)); } } return true; @@ -543,95 +623,264 @@ class ShuffleScheduler { public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, MapHost host, boolean readError, - boolean connectError) { - host.penalize(); - int failures = 1; - if (failureCounts.containsKey(srcAttempt)) { - IntWritable x = failureCounts.get(srcAttempt); - x.set(x.get() + 1); - failures = x.get(); - } else { - failureCounts.put(srcAttempt, new IntWritable(1)); + boolean connectError, + boolean isLocalFetch) { + failedShuffleCounter.increment(1); + + int failures = incrementAndGetFailureAttempt(srcAttempt); + + if (!isLocalFetch) { + /** + * Track the number of failures that has happened since last completion. + * This gets reset on a successful copy. + */ + failedShufflesSinceLastCompletion++; } - String hostPort = host.getHostIdentifier(); - // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error - // reporting / potential blacklisting of hosts. - if (hostFailures.containsKey(hostPort)) { - IntWritable x = hostFailures.get(hostPort); - x.set(x.get() + 1); - } else { - hostFailures.put(hostPort, new IntWritable(1)); + + /** + * Inform AM: + * - In case of read/connect error + * - In case attempt failures exceed threshold of + * maxFetchFailuresBeforeReporting (5) + * Bail-out if needed: + * - Check whether individual attempt crossed failure threshold limits + * - Check overall shuffle health. Bail out if needed.* + */ + + //TEZ-2890 + boolean shouldInformAM = + (reportReadErrorImmediately && (readError || connectError)) + || ((failures % maxFetchFailuresBeforeReporting) == 0); + + if (shouldInformAM) { + //Inform AM. In case producer needs to be restarted, it is handled at AM. + informAM(srcAttempt); + } + + //Restart consumer in case shuffle is not healthy + if (!isShuffleHealthy(srcAttempt)) { + return; } - if (failures >= abortFailureLimit) { + + penalizeHost(host, failures); + } + + private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) { + int attemptFailures = getFailureCount(srcAttempt); + if (attemptFailures >= abortFailureLimit) { // This task has seen too many fetch failures - report it as failed. The // AM may retry it if max failures has not been reached. - + // Between the task and the AM - someone needs to determine who is at // fault. If there's enough errors seen on the task, before the AM informs // it about source failure, the task considers itself to have failed and // allows the AM to re-schedule it. - IOException ioe = new IOException(failures - + " failures downloading " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber())); - ioe.fillInStackTrace(); + String errorMsg = "Failed " + attemptFailures + " times trying to " + + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier( + inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit; + IOException ioe = new IOException(errorMsg); // Shuffle knows how to deal with failures post shutdown via the onFailure hook exceptionReporter.reportException(ioe); + return true; } + return false; + } - failedShuffleCounter.increment(1); - checkAndInformAM(failures, srcAttempt, readError, connectError); + private void penalizeHost(MapHost host, int failures) { + host.penalize(); + + String hostPort = host.getHostIdentifier(); + // TODO TEZ-922 hostFailures isn't really used for anything apart from + // hasFailedAcrossNodes().Factor it into error + // reporting / potential blacklisting of hosts. + if (hostFailures.containsKey(hostPort)) { + IntWritable x = hostFailures.get(hostPort); + x.set(x.get() + 1); + } else { + hostFailures.put(hostPort, new IntWritable(1)); + } - checkReducerHealth(); - long delay = (long) (INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, failures)); - penalties.add(new Penalty(host, delay)); } + private int getFailureCount(InputAttemptIdentifier srcAttempt) { + IntWritable failureCount = failureCounts.get(srcAttempt); + return (failureCount == null) ? 0 : failureCount.get(); + } + + private int incrementAndGetFailureAttempt(InputAttemptIdentifier srcAttempt) { + int failures = 1; + if (failureCounts.containsKey(srcAttempt)) { + IntWritable x = failureCounts.get(srcAttempt); + x.set(x.get() + 1); + failures = x.get(); + } else { + failureCounts.put(srcAttempt, new IntWritable(1)); + } + return failures; + } + public void reportLocalError(IOException ioe) { - LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe); + LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", + ioe); // Shuffle knows how to deal with failures post shutdown via the onFailure hook exceptionReporter.reportException(ioe); } - // Notify the AM - // after every read error, if 'reportReadErrorImmediately' is true or - // after every 'maxFetchFailuresBeforeReporting' failures - private void checkAndInformAM( - int failures, InputAttemptIdentifier srcAttempt, boolean readError, - boolean connectError) { - if ((reportReadErrorImmediately && (readError || connectError)) - || ((failures % maxFetchFailuresBeforeReporting) == 0)) { - LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " - + srcAttempt + " taskAttemptIdentifier: " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber()) + " to AM."); - List<Event> failedEvents = Lists.newArrayListWithCapacity(1); - failedEvents.add(InputReadErrorEvent.create("Fetch failure for " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier() - .getInputIndex(), srcAttempt.getAttemptNumber())); - - inputContext.sendEvents(failedEvents); + // Notify AM + private void informAM(InputAttemptIdentifier srcAttempt) { + LOG.info( + srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils + .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + " to AM."); + List<Event> failedEvents = Lists.newArrayListWithCapacity(1); + failedEvents.add(InputReadErrorEvent.create( + "Fetch failure for " + TezRuntimeUtils + .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + " to jobtracker.", + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber())); + + inputContext.sendEvents(failedEvents); + } + + /** + * To determine if failures happened across nodes or not. This will help in + * determining whether this task needs to be restarted or source needs to + * be restarted. + * + * @param logContext context info for logging + * @return boolean true indicates this task needs to be restarted + */ + private boolean hasFailedAcrossNodes(String logContext) { + int numUniqueHosts = uniqueHosts.size(); + Preconditions.checkArgument(numUniqueHosts > 0, "No values in unique hosts"); + int threshold = Math.max(3, + (int) Math.ceil(numUniqueHosts * hostFailureFraction)); + int total = 0; + boolean failedAcrossNodes = false; + for(String host : uniqueHosts) { + IntWritable failures = hostFailures.get(host); + if (failures != null && failures.get() > minFailurePerHost) { + total++; + failedAcrossNodes = (total > (threshold * minFailurePerHost)); + if (failedAcrossNodes) { + break; + } + } + } + + LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts + + ", hostFailureThreshold=" + threshold + + ", hostFailuresCount=" + hostFailures.size() + + ", hosts crossing threshold=" + total + + ", reducerFetchIssues=" + failedAcrossNodes + ); + + return failedAcrossNodes; + } + + private boolean allEventsReceived() { + if (!pipelinedShuffleInfoEventsMap.isEmpty()) { + return (pipelinedShuffleInfoEventsMap.size() == numInputs); + } else { + //no pipelining + return ((pathToIdentifierMap.size() + skippedInputCounter.getValue()) + == numInputs); } } - private void checkReducerHealth() { - final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f; - final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f; - final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f; + /** + * Check if consumer needs to be restarted based on total failures w.r.t + * completed outputs and based on number of errors that have happened since + * last successful completion. Consider into account whether failures have + * been seen across different nodes. + * + * @return true to indicate fetchers are healthy + */ + private boolean isFetcherHealthy(String logContext) { long totalFailures = failedShuffleCounter.getValue(); int doneMaps = numInputs - remainingMaps.get(); - - boolean reducerHealthy = - (((float)totalFailures / (totalFailures + doneMaps)) - < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); + + boolean fetcherHealthy = true; + if (doneMaps > 0) { + fetcherHealthy = (((float) totalFailures / (totalFailures + doneMaps)) + < maxAllowedFailedFetchFraction); + } + + if (fetcherHealthy) { + //Compute this logic only when all events are received + if (allEventsReceived()) { + if (hostFailureFraction > 0) { + boolean failedAcrossNodes = hasFailedAcrossNodes(logContext); + if (failedAcrossNodes) { + return false; //not healthy + } + } + + if (checkFailedFetchSinceLastCompletion) { + /** + * remainingMaps works better instead of pendingHosts in the + * following condition because of the way the fetcher reports failures + */ + if (failedShufflesSinceLastCompletion >= + remainingMaps.get() * minFailurePerHost) { + /** + * Check if lots of errors are seen after last progress time. + * + * E.g totalFailures = 20. doneMaps = 320 - 300; + * fetcherHealthy = (20/(20+300)) < 0.5. So reducer would be marked as healthy. + * Assume 20 errors happen when downloading the last 20 attempts. Host failure & individual + * attempt failures would keep increasing; but at very slow rate 15 * 180 seconds per + * attempt to find out the issue. + * + * Instead consider the new errors with the pending items to be fetched. + * Assume 21 new errors happened after last progress; remainingMaps = (320-300) = 20; + * (21 / (21 + 20)) > 0.5 + * So we reset the reducer to unhealthy here (special case) + * + * In normal conditions (i.e happy path), this wouldn't even cause any issue as + * failedShufflesSinceLastCompletion is reset as soon as we see successful download. + */ + + fetcherHealthy = + (((float) failedShufflesSinceLastCompletion / ( + failedShufflesSinceLastCompletion + remainingMaps.get())) + < maxAllowedFailedFetchFraction); + + LOG.info(logContext + ", fetcherHealthy=" + fetcherHealthy + + ", failedShufflesSinceLastCompletion=" + + failedShufflesSinceLastCompletion + + ", remainingMaps=" + remainingMaps.get() + ); + } + } + } + } + return fetcherHealthy; + } + + boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) { + + if (isAbortLimitExceeedFor(srcAttempt)) { + return false; + } + + final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction; + final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction; + + int doneMaps = numInputs - remainingMaps.get(); + + String logContext = "srcAttempt=" + srcAttempt.toString(); + boolean fetcherHealthy = isFetcherHealthy(logContext); // check if the reducer has progressed enough boolean reducerProgressedEnough = @@ -647,31 +896,31 @@ class ShuffleScheduler { int shuffleProgressDuration = (int)(lastProgressTime - startTime); - // min time the reducer should run without getting killed - int minShuffleRunDuration = - (shuffleProgressDuration > maxMapRuntime) - ? shuffleProgressDuration - : maxMapRuntime; - - boolean reducerStalled = - (((float)stallDuration / minShuffleRunDuration) + boolean reducerStalled = (shuffleProgressDuration > 0) && + (((float)stallDuration / shuffleProgressDuration) >= MAX_ALLOWED_STALL_TIME_PERCENT); // kill if not healthy and has insufficient progress if ((failureCounts.size() >= maxFailedUniqueFetches || failureCounts.size() == (numInputs - doneMaps)) - && !reducerHealthy + && !fetcherHealthy && (!reducerProgressedEnough || reducerStalled)) { - LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " + - "and insufficient progress!" - + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps) - + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough=" - + reducerProgressedEnough + ", reducerStalled=" + reducerStalled); - String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out."; + String errorMsg = (srcNameTrimmed + ": " + + "Shuffle failed with too many fetch failures and insufficient progress!" + + "failureCounts=" + failureCounts.size() + + ", pendingInputs=" + (numInputs - doneMaps) + + ", fetcherHealthy=" + fetcherHealthy + + ", reducerProgressedEnough=" + reducerProgressedEnough + + ", reducerStalled=" + reducerStalled); + LOG.error(errorMsg); + if (LOG.isDebugEnabled()) { + LOG.debug("Host failures=" + hostFailures.keySet()); + } // Shuffle knows how to deal with failures post shutdown via the onFailure hook exceptionReporter.reportException(new IOException(errorMsg)); + return false; } - + return true; } public synchronized void addKnownMapOutput(String inputHostName, @@ -680,6 +929,7 @@ class ShuffleScheduler { String hostUrl, InputAttemptIdentifier srcAttempt) { String hostPort = (inputHostName + ":" + String.valueOf(port)); + uniqueHosts.add(hostPort); String identifier = MapHost.createIdentifier(hostPort, partitionId); @@ -697,7 +947,8 @@ class ShuffleScheduler { host.addKnownMap(srcAttempt); pathToIdentifierMap.put( - getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt); + getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), + partitionId), srcAttempt); // Mark the host as pending if (host.getState() == MapHost.State.PENDING) { @@ -709,8 +960,8 @@ class ShuffleScheduler { public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) { // The incoming srcAttempt does not contain a path component. LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt); - if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { - //Pipelined shuffle case (where shuffleInfoEventsMap gets populated). + if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { + //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated). //Fail fast here. exceptionReporter.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " @@ -935,12 +1186,7 @@ class ShuffleScheduler { } } - public synchronized void informMaxMapRunTime(int duration) { - if (duration > maxMapRuntime) { - maxMapRuntime = duration; - } - } - + void setInputFinished(int inputIndex) { synchronized(finishedMaps) { finishedMaps.set(inputIndex, true); http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index 880dc2f..39cc471 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -355,6 +355,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index ceb0266..faa2d31 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -278,7 +278,7 @@ public class TestFetcher { }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId())); doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), - anyLong(), anyLong(), anyLong(), any(MapOutput.class)); + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); doNothing().when(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); doNothing().when(scheduler).putBackKnownMapOutput(host, @@ -290,8 +290,8 @@ public class TestFetcher { for (int i : sucessfulAttemptsIndexes) { verifyCopySucceeded(scheduler, host, srcAttempts, i); } - verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false); - verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); verify(metrics, times(3)).successFetch(); verify(metrics, times(2)).failedFetch(); @@ -309,7 +309,7 @@ public class TestFetcher { String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent(); ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class); verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100), - eq(p * 1000), anyLong(), captureMapOutput.capture()); + eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean()); // cannot use the equals of MapOutput as it compares id which is private. so doing it manually MapOutput m = captureMapOutput.getAllValues().get(0); @@ -414,7 +414,7 @@ public class TestFetcher { verify(fetcher, times(2)).setupConnection(any(MapHost.class), any(Collection.class)); //since copyMapOutput consistently fails, it should call copyFailed once verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class), - anyBoolean(), anyBoolean()); + anyBoolean(), anyBoolean(), anyBoolean()); verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class)); verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class), http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 78d214c..88a1d20 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -171,7 +171,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); int partitionId = attemptNum; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1)); - verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); + verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); @@ -181,9 +181,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme2)); baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); partitionId = attemptNum; - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); + assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2)); - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); + assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); MapHost host = scheduler.getHost(); assertTrue(host != null); @@ -191,10 +191,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped { assertTrue(!list.isEmpty()); //Let the final_update event pass MapOutput output = MapOutput.createMemoryMapOutput(id2, mergeManager, 1000, true); - scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output, false); assertTrue(!scheduler.isDone()); //we haven't downloaded id1 yet output = MapOutput.createMemoryMapOutput(id1, mergeManager, 1000, true); - scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another source yet //Send events for source 2 @@ -210,11 +210,11 @@ public class TestShuffleInputEventHandlerOrderedGrouped { InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex())); - scheduler.copySucceeded(id4, null, 0, 0, 0, null); + scheduler.copySucceeded(id4, null, 0, 0, 0, null, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet //Let the incremental event pass output = MapOutput.createMemoryMapOutput(id3, mergeManager, 1000, true); - scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output, false); assertTrue(scheduler.isDone()); } @@ -234,7 +234,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); assertTrue("Shuffle info events should not be empty for pipelined shuffle", - !scheduler.shuffleInfoEventsMap.isEmpty()); + !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); //Attempt #0 comes up. When processing this, it should report exception attemptNum = 0; @@ -263,7 +263,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(expectedIdentifier)); assertTrue("Shuffle info events should be empty for regular shuffle codepath", - scheduler.shuffleInfoEventsMap.isEmpty()); + scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); } @Test(timeout = 5000) @@ -288,7 +288,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), - eq(0l), eq(0l), any(MapOutput.class)); + eq(0l), eq(0l), any(MapOutput.class), eq(true)); } @Test(timeout = 5000) @@ -302,7 +302,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), - eq(0l), eq(0l), any(MapOutput.class)); + eq(0l), eq(0l), any(MapOutput.class), eq(true)); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java index ac6c6c0..3fe540c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleScheduler.java @@ -14,10 +14,16 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; @@ -143,7 +149,7 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput); + scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false); scheduler.freeHost(mapHosts[i]); } @@ -155,6 +161,617 @@ public class TestShuffleScheduler { } } + @Test(timeout = 60000) + /** + * Scenario + * - reducer has not progressed enough + * - reducer becomes unhealthy after some failures + * - no of attempts failing exceeds maxFailedUniqueFetches (5) + * Expected result + * - fail the reducer + */ + public void testReducerHealth_1() throws IOException { + Configuration conf = new TezConfiguration(); + _testReducerHealth_1(conf); + conf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 4000); + _testReducerHealth_1(conf); + } + + public void _testReducerHealth_1(Configuration conf) throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, + shuffle, conf); + + int totalProducerNodes = 20; + + //Generate 320 events + for (int i = 0; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + + //100 succeeds + for (int i = 0; i < 100; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //99 fails + for (int i = 100; i < 199; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(200), 0, "attempt_"); + + //Should fail here and report exception as reducer is not healthy + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(200, "host" + (200 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + int minFailurePerHost = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT); + + if (minFailurePerHost <= 4) { + //As per test threshold. Should fail & retrigger shuffle + verify(shuffle, atLeast(0)).reportException(any(Throwable.class)); + } else if (minFailurePerHost > 100) { + //host failure is so high that this would not retrigger shuffle re-execution + verify(shuffle, atLeast(1)).reportException(any(Throwable.class)); + } + } + + @Test(timeout = 60000) + /** + * Scenario + * - reducer has progressed enough + * - failures start happening after that + * - no of attempts failing exceeds maxFailedUniqueFetches (5) + * - Has not stalled + * Expected result + * - Since reducer is not stalled, it should continue without error + * + * When reducer stalls, wait until enough retries are done and throw exception + * + */ + public void testReducerHealth_2() throws IOException, InterruptedException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle); + + int totalProducerNodes = 20; + + //Generate 0-200 events + for (int i = 0; i < 200; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + assertEquals(320, scheduler.remainingMaps.get()); + + //Generate 200-320 events with empty partitions + for (int i = 200; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copySucceeded(inputAttemptIdentifier, null, 0, 0, 0, null, true); + } + //120 are successful. so remaining is 200 + assertEquals(200, scheduler.remainingMaps.get()); + + + //200 pending to be downloaded. Download 190. + for (int i = 0; i < 190; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + assertEquals(10, scheduler.remainingMaps.get()); + + //10 fails + for (int i = 190; i < 200; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + //Shuffle has not stalled. so no issues. + verify(scheduler.reporter, times(0)).reportException(any(Throwable.class)); + + //stall shuffle + scheduler.lastProgressTime = System.currentTimeMillis() - 250000; + + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(190), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(190, "host" + + (190 % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + //Even when it is stalled, need (320 - 300 = 20) * 3 = 60 failures + verify(scheduler.reporter, times(0)).reportException(any(Throwable.class)); + + assertEquals(11, scheduler.failedShufflesSinceLastCompletion); + + //fail to download 50 more times across attempts + for (int i = 190; i < 200; i++) { + inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + assertEquals(61, scheduler.failedShufflesSinceLastCompletion); + assertEquals(10, scheduler.remainingMaps.get()); + + verify(shuffle, atLeast(0)).reportException(any(Throwable.class)); + + //fail another 30 + for (int i = 110; i < 120; i++) { + inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + // Should fail now due to fetcherHealthy. (stall has already happened and + // these are the only pending tasks) + verify(shuffle, atLeast(1)).reportException(any(Throwable.class)); + } + + + + @Test(timeout = 60000) + /** + * Scenario + * - reducer has progressed enough + * - failures start happening after that in last fetch + * - no of attempts failing does not exceed maxFailedUniqueFetches (5) + * - Stalled + * Expected result + * - Since reducer is stalled and if failures haven't happened across nodes, + * it should be fine to proceed. AM would restart source task eventually. + * + */ + public void testReducerHealth_3() throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle); + + int totalProducerNodes = 20; + + //Generate 320 events + for (int i = 0; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + + //319 succeeds + for (int i = 0; i < 319; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //1 fails (last fetch) + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + //stall the shuffle + scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; + + assertEquals(scheduler.remainingMaps.get(), 1); + + //Retry for 3 more times + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + // failedShufflesSinceLastCompletion has crossed the limits. Throw error + verify(shuffle, times(0)).reportException(any(Throwable.class)); + } + + @Test(timeout = 60000) + /** + * Scenario + * - reducer has progressed enough + * - failures have happened randomly in nodes, but tasks are completed + * - failures start happening after that in last fetch + * - no of attempts failing does not exceed maxFailedUniqueFetches (5) + * - Stalled + * Expected result + * - reducer is stalled. But since errors are not seen across multiple + * nodes, it is left to the AM to retart producer. Do not kill consumer. + * + */ + public void testReducerHealth_4() throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle); + + int totalProducerNodes = 20; + + //Generate 320 events + for (int i = 0; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + + //Tasks fail in 20% of nodes 3 times, but are able to proceed further + for (int i = 0; i < 64; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % + totalProducerNodes) + ":" + 10000, ""), false, true, false); + + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % + totalProducerNodes) + ":" + 10000, ""), false, true, false); + + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % + totalProducerNodes) + ":" + 10000, ""), false, true, false); + + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //319 succeeds + for (int i = 64; i < 319; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //1 fails (last fetch) + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(319), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + //stall the shuffle (but within limits) + scheduler.lastProgressTime = System.currentTimeMillis() - 100000; + + assertEquals(scheduler.remainingMaps.get(), 1); + + //Retry for 3 more times + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + // failedShufflesSinceLastCompletion has crossed the limits. 20% of other nodes had failures as + // well. However, it has failed only in one host. So this should proceed + // until AM decides to restart the producer. + verify(shuffle, times(0)).reportException(any(Throwable.class)); + + //stall the shuffle (but within limits) + scheduler.lastProgressTime = System.currentTimeMillis() - 300000; + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(319, "host" + (319 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + verify(shuffle, times(1)).reportException(any(Throwable.class)); + + } + + @Test(timeout = 60000) + /** + * Scenario + * - Shuffle has progressed enough + * - Last event is yet to arrive + * - Failures start happening after Shuffle has progressed enough + * - no of attempts failing does not exceed maxFailedUniqueFetches (5) + * - Stalled + * Expected result + * - Do not throw errors, as Shuffle is yet to receive inputs + * + */ + public void testReducerHealth_5() throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle); + + int totalProducerNodes = 20; + + //Generate 319 events (last event has not arrived) + for (int i = 0; i < 319; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + + //318 succeeds + for (int i = 0; i < 319; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //1 fails (last fetch) + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(318), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + //stall the shuffle + scheduler.lastProgressTime = System.currentTimeMillis() - 1000000; + + assertEquals(scheduler.remainingMaps.get(), 1); + + //Retry for 3 more times + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(318, "host" + (318 % + totalProducerNodes) + + ":" + 10000, ""), false, true, false); + + //Shuffle has not received the events completely. So do not bail out yet. + verify(shuffle, times(0)).reportException(any(Throwable.class)); + } + + + @Test(timeout = 60000) + /** + * Scenario + * - Shuffle has NOT progressed enough + * - Failures start happening + * - no of attempts failing exceed maxFailedUniqueFetches (5) + * - Not stalled + * Expected result + * - Bail out + * + */ + public void testReducerHealth_6() throws IOException { + Configuration conf = new TezConfiguration(); + conf.setBoolean + (TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true); + _testReducerHealth_6(conf); + + conf.setBoolean + (TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false); + _testReducerHealth_6(conf); + + } + + public void _testReducerHealth_6(Configuration conf) throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, + shuffle, conf); + + int totalProducerNodes = 20; + + //Generate 320 events (last event has not arrived) + for (int i = 0; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), + 10000, i, "hostUrl", inputAttemptIdentifier); + } + + //10 succeeds + for (int i = 0; i < 10; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), 100, 200, startTime + (i * 100), mapOutput, false); + } + + //5 fetches fail once + for (int i = 10; i < 15; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + assertTrue(scheduler.failureCounts.size() >= 5); + assertEquals(scheduler.remainingMaps.get(), 310); + + //Do not bail out (number of failures is just 5) + verify(scheduler.reporter, times(0)).reportException(any(Throwable.class)); + + //5 fetches fail repeatedly + for (int i = 10; i < 15; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, new MapHost(i, "host" + (i % totalProducerNodes) + + ":" + 10000, ""), false, true, false); + } + + boolean checkFailedFetchSinceLastCompletion = conf.getBoolean + (TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); + if (checkFailedFetchSinceLastCompletion) { + // Now bail out, as Shuffle has crossed the + // failedShufflesSinceLastCompletion limits. (even + // though reducerHeathly is + verify(shuffle, atLeast(1)).reportException(any(Throwable.class)); + } else { + //Do not bail out yet. + verify(shuffle, atLeast(0)).reportException(any(Throwable.class)); + } + + } + + @Test(timeout = 60000) + /** + * Scenario + * - reducer has not progressed enough + * - fetch fails > + * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION + * Expected result + * - fail the reducer + */ + public void testReducerHealth_7() throws IOException { + long startTime = System.currentTimeMillis() - 500000; + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 320, shuffle); + + int totalProducerNodes = 20; + + //Generate 320 events + for (int i = 0; i < 320; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.addKnownMapOutput("host" + (i % totalProducerNodes), 10000, i, + "hostUrl", inputAttemptIdentifier); + } + + //100 succeeds + for (int i = 0; i < 100; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + MapOutput mapOutput = MapOutput + .createMemoryMapOutput(inputAttemptIdentifier, mock(FetchedInputAllocatorOrderedGrouped.class), + 100, false); + scheduler.copySucceeded(inputAttemptIdentifier, + new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + 100, 200, startTime + (i * 100), mapOutput, false); + } + + //99 fails + for (int i = 100; i < 199; i++) { + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(i), 0, "attempt_"); + scheduler.copyFailed(inputAttemptIdentifier, + new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, + new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, + new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + false, true, false); + scheduler.copyFailed(inputAttemptIdentifier, + new MapHost(i, "host" + (i % totalProducerNodes) + ":" + 10000, ""), + false, true, false); + } + + verify(shuffle, atLeast(1)).reportException(any(Throwable.class)); + } + + private ShuffleSchedulerForTest createScheduler(long startTime, int + numInputs, Shuffle shuffle, Configuration conf) + throws IOException { + InputContext inputContext = createTezInputContext(); + MergeManager mergeManager = mock(MergeManager.class); + + final ShuffleSchedulerForTest scheduler = + new ShuffleSchedulerForTest(inputContext, conf, numInputs, shuffle, mergeManager, + mergeManager,startTime, null, false, 0, "srcName"); + return scheduler; + } + + private ShuffleSchedulerForTest createScheduler(long startTime, int numInputs, Shuffle shuffle) + throws IOException { + return createScheduler(startTime, numInputs, shuffle, new + TezConfiguration()); + } + + @Test(timeout = 60000) + public void testPenalty() throws IOException, InterruptedException { + long startTime = System.currentTimeMillis(); + Shuffle shuffle = mock(Shuffle.class); + final ShuffleSchedulerForTest scheduler = createScheduler(startTime, 1, shuffle); + + InputAttemptIdentifier inputAttemptIdentifier = + new InputAttemptIdentifier(new InputIdentifier(0), 0, "attempt_"); + scheduler.addKnownMapOutput("host0", 10000, 0, "hostUrl", inputAttemptIdentifier); + + assertTrue(scheduler.pendingHosts.size() == 1); + assertTrue(scheduler.pendingHosts.iterator().next().getState() == MapHost.State.PENDING); + MapHost mapHost = scheduler.pendingHosts.iterator().next(); + + //Fails to pull from host0. host0 should be added to penalties + scheduler.copyFailed(inputAttemptIdentifier, mapHost, false, true, false); + + //Should not get host, as it is added to penalty loop + MapHost host = scheduler.getHost(); + assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000")); + + //Refree thread would release it after INITIAL_PENALTY timeout + Thread.sleep(ShuffleScheduler.INITIAL_PENALTY + 1000); + host = scheduler.getHost(); + assertFalse(host.getIdentifier(), host.getIdentifier().equalsIgnoreCase("host0:10000")); + } + @Test(timeout = 5000) public void testShutdown() throws Exception { InputContext inputContext = createTezInputContext(); @@ -198,7 +815,7 @@ public class TestShuffleScheduler { MapOutput mapOutput = MapOutput .createMemoryMapOutput(identifiers[i], mock(FetchedInputAllocatorOrderedGrouped.class), 100, false); - scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput); + scheduler.copySucceeded(identifiers[i], mapHosts[i], 20, 25, 100, mapOutput, false); scheduler.freeHost(mapHosts[i]); } @@ -234,6 +851,7 @@ public class TestShuffleScheduler { private final AtomicInteger numFetchersCreated = new AtomicInteger(0); private final boolean fetcherShouldWait; + private final ExceptionReporter reporter; public ShuffleSchedulerForTest(InputContext inputContext, Configuration conf, int numberOfInputs, @@ -258,6 +876,7 @@ public class TestShuffleScheduler { super(inputContext, conf, numberOfInputs, shuffle, mergeManager, allocator, startTime, codec, ifileReadAhead, ifileReadAheadLength, srcNameTrimmed); this.fetcherShouldWait = fetcherShouldWait; + this.reporter = shuffle; } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/46f2454a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java index 674d4b4..49b5490 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java @@ -58,6 +58,23 @@ public class TestOrderedGroupedMergedKVInputConfig { fromConf.set("test.conf.key.1", "confkey1"); fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111); fromConf.set("io.shouldExist", "io"); + fromConf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 3); + fromConf.setFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, + 0.1f); + fromConf.setFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, + 0.6f); + fromConf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, + 10); + fromConf.setFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.2f); + fromConf.setFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f); + fromConf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false); Map<String, String> additionalConf = new HashMap<String, String>(); additionalConf.put("test.key.2", "key2"); additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, "3"); @@ -96,6 +113,19 @@ public class TestOrderedGroupedMergedKVInputConfig { assertEquals(0.22f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f); assertEquals(0.33f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.0f), 0.001f); assertEquals(0.44f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.00f), 0.001f); + assertEquals(0.1f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, 0.00f), 0.001f); + assertEquals(3, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 0), 0); + assertEquals(10, conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, 0), 0); + assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, 0.00f), 0.001f); + assertEquals(0.2f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.00f), 0.001f); + assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f), 0.001f); + assertEquals(false, conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true)); // Verify additional configs assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
