Repository: tez Updated Branches: refs/heads/branch-0.7 0bb3b1d23 -> 45aa72dbf
TEZ-2882. Consider improving fetch failure handling (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/45aa72db Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/45aa72db Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/45aa72db Branch: refs/heads/branch-0.7 Commit: 45aa72dbf4f7a0a6878ac88c8bd8142aa0addbb9 Parents: 0bb3b1d Author: Rajesh Balamohan <[email protected]> Authored: Thu Oct 22 15:02:56 2015 -0700 Committer: Rajesh Balamohan <[email protected]> Committed: Thu Oct 22 15:02:56 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/api/TezRuntimeConfiguration.java | 100 ++++ .../orderedgrouped/FetcherOrderedGrouped.java | 10 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 9 +- .../orderedgrouped/ShuffleScheduler.java | 484 ++++++++++++++----- .../library/input/OrderedGroupedKVInput.java | 14 + .../shuffle/orderedgrouped/TestFetcher.java | 10 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 22 +- .../TestOrderedGroupedMergedKVInputConfig.java | 30 ++ 9 files changed, 530 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ba5b75d..13a2d0c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES + TEZ-2882. Consider improving fetch failure handling TEZ-2907. NPE in IFile.Reader.getLength during final merge operation TEZ-2850. Tez MergeManager OOM for small Map Outputs TEZ-2886. Ability to merge AM credentials with DAG credentials. http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index a818de8..3cfbf8e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -170,6 +170,98 @@ public class TezRuntimeConfiguration { "shuffle.fetch.failures.limit"; public static final int TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT_DEFAULT = 5; + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. Sets + * the number of retries before giving up on downloading from source + * attempt by consumer. Code internally handles the threshold if set to -1. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT = + TEZ_RUNTIME_PREFIX + "shuffle.src-attempt.abort.limit"; + public static final int + TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT = -1; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. Setting + * to determine if failures happened across a percentage of nodes. This + * helps in determining if the consumer has to be restarted on continuous + * failures. Setting it to lower value can make consumer restarts more + * aggressive on failures. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.acceptable.host-fetch.failure.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT = 0.2f; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. Setting + * to determine if the consumer has to be restarted on continuous + * failures across nodes. Used along with {@link + * TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION}. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST = + TEZ_RUNTIME_PREFIX + "shuffle.min.failures.per.host"; + public static final int TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT = 4; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. + * Maximum percentage of time (compared to overall progress), the fetcher is + * allowed before concluding that it is stalled. + */ + public static final String TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.max.stall.time.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. + * Fraction to determine whether the shuffle has progressed enough or not + * If it has not progressed enough, it could be qualified for the consumer. + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.min.required.progress.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. + * Provides threshold for determining whether fetching has to be marked + * unhealthy based on the ratio of (failures/(failures+completed)) + */ + public static final String + TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION = + TEZ_RUNTIME_PREFIX + "shuffle.max.allowed.failed.fetch.fraction"; + public static final float + TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT = 0.5f; + + @Private + @Unstable + /** + * Expert setting made available only for debugging. Do not change it. + * Provides threshold for determining whether fetching has to be marked + * unhealthy based on the ratio of (failures/(failures+completed)) + */ + public static final String + TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION = + TEZ_RUNTIME_PREFIX + "shuffle.failed.check.since-last.completion"; + public static final boolean + TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT = true; + public static final String TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE = TEZ_RUNTIME_PREFIX + @@ -357,6 +449,14 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM); + tezRuntimeKeys.add + (TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); + tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 0ba37dd..7b666e9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -311,7 +311,7 @@ class FetcherOrderedGrouped extends Thread { } else { LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(failedTasks)); for (InputAttemptIdentifier left : failedTasks) { - scheduler.copyFailed(left, host, true, false); + scheduler.copyFailed(left, host, true, false, false); } } } @@ -370,7 +370,7 @@ class FetcherOrderedGrouped extends Thread { for(InputAttemptIdentifier left: remaining) { // Need to be handling temporary glitches .. // Report read error to the AM to trigger source failure heuristics - scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded); + scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false); } return false; } @@ -516,7 +516,7 @@ class FetcherOrderedGrouped extends Thread { retryStartTime = 0; scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, - endTime - startTime, mapOutput); + endTime - startTime, mapOutput, false); // Note successful shuffle remaining.remove(srcAttemptId); metrics.successFetch(); @@ -681,7 +681,7 @@ class FetcherOrderedGrouped extends Thread { mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); long endTime = System.currentTimeMillis(); scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), - indexRecord.getRawLength(), (endTime - startTime), mapOutput); + indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); iter.remove(); metrics.successFetch(); } catch (IOException e) { @@ -691,7 +691,7 @@ class FetcherOrderedGrouped extends Thread { if (!stopped) { metrics.failedFetch(); ioErrs.increment(1); - scheduler.copyFailed(srcAttemptId, host, true, false); + scheduler.copyFailed(srcAttemptId, host, true, false, true); LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), e); } else { http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 8d0c357..2755358 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -50,7 +50,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl private final ShuffleScheduler scheduler; private final InputContext inputContext; - private int maxMapRuntime = 0; private final boolean sslShuffle; private final AtomicInteger nextToLogEventCount = new AtomicInteger(0); @@ -110,12 +109,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)); } - // TODO NEWTEZ See if this duration hack can be removed. - int duration = shufflePayload.getRunDuration(); - if (duration > maxMapRuntime) { - maxMapRuntime = duration; - scheduler.informMaxMapRunTime(maxMapRuntime); - } if (shufflePayload.hasEmptyPartitions()) { try { byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions()); @@ -128,7 +121,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl + srcAttemptIdentifier + "]. Not fetching."); } numDmeEventsNoData.incrementAndGet(); - scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null); + scheduler.copySucceeded(srcAttemptIdentifier, null, 0, 0, 0, null, true); return; } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index b98bc56..4a71268 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -39,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -67,31 +67,33 @@ class ShuffleScheduler { }; private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class); - private static final long INITIAL_PENALTY = 2000l; // 2 seconds + static final long INITIAL_PENALTY = 2000L; // 2 seconds private static final float PENALTY_GROWTH_RATE = 1.3f; private boolean[] finishedMaps; private final int numInputs; private final String srcNameTrimmed; - private int remainingMaps; private int numFetchedSpills; private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>(); - //TODO Clean this and other maps at some point - private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); + @VisibleForTesting + final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<String, InputAttemptIdentifier>(); //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle is // enabled in source. @VisibleForTesting - final Map<InputIdentifier, ShuffleEventInfo> shuffleInfoEventsMap; + final Map<InputIdentifier, ShuffleEventInfo> pipelinedShuffleInfoEventsMap; - private Set<MapHost> pendingHosts = new HashSet<MapHost>(); + @VisibleForTesting + final Set<MapHost> pendingHosts = new HashSet<MapHost>(); private Set<InputAttemptIdentifier> obsoleteInputs = new HashSet<InputAttemptIdentifier>(); private final Random random = new Random(System.currentTimeMillis()); private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>(); private final Referee referee; - private final Map<InputAttemptIdentifier, IntWritable> failureCounts = - new HashMap<InputAttemptIdentifier,IntWritable>(); + @VisibleForTesting + final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>(); + final Set<String> uniqueHosts = Sets.newHashSet(); private final Map<String,IntWritable> hostFailures = new HashMap<String,IntWritable>(); private final InputContext inputContext; @@ -100,25 +102,37 @@ class ShuffleScheduler { private final TezCounter skippedInputCounter; private final TezCounter reduceShuffleBytes; private final TezCounter reduceBytesDecompressed; - private final TezCounter failedShuffleCounter; + @VisibleForTesting + final TezCounter failedShuffleCounter; private final TezCounter bytesShuffledToDisk; private final TezCounter bytesShuffledToDiskDirect; private final TezCounter bytesShuffledToMem; private final TezCounter firstEventReceived; private final TezCounter lastEventReceived; + @VisibleForTesting + final AtomicInteger remainingMaps; private final long startTime; - private long lastProgressTime; + @VisibleForTesting + long lastProgressTime; + @VisibleForTesting + long failedShufflesSinceLastCompletion; private int maxTaskOutputAtOnce; private int maxFetchFailuresBeforeReporting; private boolean reportReadErrorImmediately = true; private int maxFailedUniqueFetches = 5; private final int abortFailureLimit; - private int maxMapRuntime = 0; + + private final int minFailurePerHost; + private final float hostFailureFraction; + private final float maxStallTimeFraction; + private final float minReqProgressFraction; + private final float maxAllowedFailedFetchFraction; + private final boolean checkFailedFetchSinceLastCompletion; private long totalBytesShuffledTillNow = 0; - private DecimalFormat mbpsFormat = new DecimalFormat("0.00"); + private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); public ShuffleScheduler(InputContext inputContext, Configuration conf, @@ -134,9 +148,58 @@ class ShuffleScheduler { String srcNameTrimmed) { this.inputContext = inputContext; this.numInputs = numberOfInputs; - abortFailureLimit = Math.max(30, numberOfInputs / 10); - remainingMaps = numberOfInputs; - finishedMaps = new boolean[remainingMaps]; // default init to false + int abortFailureLimitConf = conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT_DEFAULT); + if (abortFailureLimitConf <= -1) { + abortFailureLimit = Math.max(15, numberOfInputs / 10); + } else { + //No upper cap, as user is setting this intentionally + abortFailureLimit = abortFailureLimitConf; + } + remainingMaps = new AtomicInteger(numberOfInputs); + finishedMaps = new boolean[remainingMaps.get()]; // default init to false + + this.minFailurePerHost = conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST_DEFAULT); + Preconditions.checkArgument(minFailurePerHost >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST + + "=" + minFailurePerHost + " should not be negative"); + + this.hostFailureFraction = conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION_DEFAULT); + + this.maxStallTimeFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION_DEFAULT); + Preconditions.checkArgument(maxStallTimeFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION + + "=" + maxStallTimeFraction + " should not be negative"); + + this.minReqProgressFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION_DEFAULT); + Preconditions.checkArgument(minReqProgressFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION + + "=" + minReqProgressFraction + " should not be negative"); + + this.maxAllowedFailedFetchFraction = conf.getFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION_DEFAULT); + Preconditions.checkArgument(maxAllowedFailedFetchFraction >= 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION + + "=" + maxAllowedFailedFetchFraction + " should not be negative"); + + this.checkFailedFetchSinceLastCompletion = conf.getBoolean + (TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, + TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION_DEFAULT); + this.srcNameTrimmed = srcNameTrimmed; this.referee = new Referee(); this.shuffle = shuffle; @@ -173,14 +236,19 @@ class ShuffleScheduler { this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); - shuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = new HashMap<InputIdentifier, ShuffleEventInfo>(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting + ", reportReadErrorImmediately=" + reportReadErrorImmediately + ", maxFailedUniqueFetches=" + maxFailedUniqueFetches + ", abortFailureLimit=" + abortFailureLimit - + ", maxMapRuntime=" + maxMapRuntime + + ", hostFailureFraction=" + hostFailureFraction + + ", minFailurePerHost=" + minFailurePerHost + + ", maxAllowedFailedFetchFraction=" + maxAllowedFailedFetchFraction + + ", maxStallTimeFraction=" + maxStallTimeFraction + + ", minReqProgressFraction=" + minReqProgressFraction + + ", checkFailedFetchSinceLastCompletion=" + checkFailedFetchSinceLastCompletion + ", maxTaskOutputAtOnce=" + maxTaskOutputAtOnce); } @@ -239,10 +307,17 @@ class ShuffleScheduler { long bytesCompressed, long bytesDecompressed, long millis, - MapOutput output + MapOutput output, + boolean isLocalFetch ) throws IOException { if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { + if (!isLocalFetch) { + /** + * Reset it only when it is a non-local-disk copy. + */ + failedShufflesSinceLastCompletion = 0; + } if (output != null) { failureCounts.remove(srcAttemptIdentifier); @@ -273,7 +348,7 @@ class ShuffleScheduler { * we retrieve all spill details to claim success. */ if (!srcAttemptIdentifier.canRetrieveInputInChunks()) { - remainingMaps = remainingMaps - 1; + remainingMaps.decrementAndGet(); setInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex()); numFetchedSpills++; } else { @@ -283,12 +358,12 @@ class ShuffleScheduler { return; } - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(inputIdentifier); + ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(inputIdentifier); //Possible that Shuffle event handler invoked this, due to empty partitions if (eventInfo == null && output == null) { eventInfo = new ShuffleEventInfo(srcAttemptIdentifier); - shuffleInfoEventsMap.put(inputIdentifier, eventInfo); + pipelinedShuffleInfoEventsMap.put(inputIdentifier, eventInfo); } assert(eventInfo != null); @@ -301,12 +376,12 @@ class ShuffleScheduler { //check if we downloaded all spills pertaining to this InputAttemptIdentifier if (eventInfo.isDone()) { - remainingMaps = remainingMaps - 1; + remainingMaps.decrementAndGet(); setInputFinished(inputIdentifier.getInputIndex()); - shuffleInfoEventsMap.remove(inputIdentifier); + pipelinedShuffleInfoEventsMap.remove(inputIdentifier); if (LOG.isTraceEnabled()) { LOG.trace("Removing : " + srcAttemptIdentifier + ", pending: " + - shuffleInfoEventsMap); + pipelinedShuffleInfoEventsMap); } } @@ -315,7 +390,7 @@ class ShuffleScheduler { } } - if (remainingMaps == 0) { + if (remainingMaps.get() == 0) { LOG.info(srcNameTrimmed + ": " + "All inputs fetched for input vertex : " + inputContext.getSourceVertexName()); notifyAll(); } @@ -334,7 +409,8 @@ class ShuffleScheduler { } } else { // input is already finished. duplicate fetch. - LOG.warn(srcNameTrimmed + ": " + "Duplicate fetch of input no longer needs to be fetched: " + srcAttemptIdentifier); + LOG.warn(srcNameTrimmed + ": Duplicate fetch of input " + + "no longer needs to be fetched: " + srcAttemptIdentifier); // free the resource - specially memory // If the src does not generate data, output will be null. @@ -349,7 +425,7 @@ class ShuffleScheduler { //For pipelined shuffle. //TODO: TEZ-2132 for error handling. As of now, fail fast if there is a different attempt if (input.canRetrieveInputInChunks()) { - ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + ShuffleEventInfo eventInfo = pipelinedShuffleInfoEventsMap.get(input.getInputIdentifier()); if (eventInfo != null && input.getAttemptNumber() != eventInfo.attemptNum) { reportExceptionForInput(new IOException("Previous event already got scheduled for " + input + ". Previous attempt's data could have been already merged " @@ -360,7 +436,7 @@ class ShuffleScheduler { } if (eventInfo == null) { - shuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input)); + pipelinedShuffleInfoEventsMap.put(input.getInputIdentifier(), new ShuffleEventInfo(input)); } } return true; @@ -375,7 +451,7 @@ class ShuffleScheduler { private final AtomicInteger nextProgressLineEventCount = new AtomicInteger(0); private void logProgress() { - int inputsDone = numInputs - remainingMaps; + int inputsDone = numInputs - remainingMaps.get(); if (inputsDone > nextProgressLineEventCount.get() || inputsDone == numInputs) { nextProgressLineEventCount.addAndGet(50); double mbs = (double) totalBytesShuffledTillNow / (1024 * 1024); @@ -391,96 +467,266 @@ class ShuffleScheduler { public synchronized void copyFailed(InputAttemptIdentifier srcAttempt, MapHost host, boolean readError, - boolean connectError) { - host.penalize(); - int failures = 1; - if (failureCounts.containsKey(srcAttempt)) { - IntWritable x = failureCounts.get(srcAttempt); - x.set(x.get() + 1); - failures = x.get(); - } else { - failureCounts.put(srcAttempt, new IntWritable(1)); + boolean connectError, + boolean isLocalFetch + ) { + failedShuffleCounter.increment(1); + + int failures = incrementAndGetFailureAttempt(srcAttempt); + + if (!isLocalFetch) { + /** + * Track the number of failures that has happened since last completion. + * This gets reset on a successful copy. + */ + failedShufflesSinceLastCompletion++; } - String hostPort = host.getHostIdentifier(); - // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error - // reporting / potential blacklisting of hosts. - if (hostFailures.containsKey(hostPort)) { - IntWritable x = hostFailures.get(hostPort); - x.set(x.get() + 1); - } else { - hostFailures.put(hostPort, new IntWritable(1)); + + /** + * Inform AM: + * - In case of read/connect error + * - In case attempt failures exceed threshold of + * maxFetchFailuresBeforeReporting (5) + * Bail-out if needed: + * - Check whether individual attempt crossed failure threshold limits + * - Check overall shuffle health. Bail out if needed.* + */ + + //TEZ-2890 + boolean shouldInformAM = + (reportReadErrorImmediately && (readError || connectError)) + || ((failures % maxFetchFailuresBeforeReporting) == 0); + + if (shouldInformAM) { + //Inform AM. In case producer needs to be restarted, it is handled at AM. + informAM(srcAttempt); + } + + //Restart consumer in case shuffle is not healthy + if (!isShuffleHealthy(srcAttempt)) { + return; } - if (failures >= abortFailureLimit) { + + penalizeHost(host, failures); + } + + private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) { + int attemptFailures = getFailureCount(srcAttempt); + if (attemptFailures >= abortFailureLimit) { // This task has seen too many fetch failures - report it as failed. The // AM may retry it if max failures has not been reached. - + // Between the task and the AM - someone needs to determine who is at // fault. If there's enough errors seen on the task, before the AM informs // it about source failure, the task considers itself to have failed and // allows the AM to re-schedule it. - IOException ioe = new IOException(failures - + " failures downloading " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber())); - ioe.fillInStackTrace(); + String errorMsg = "Failed " + attemptFailures + " times trying to " + + "download from " + TezRuntimeUtils.getTaskAttemptIdentifier( + inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit; + IOException ioe = new IOException(errorMsg); // Shuffle knows how to deal with failures post shutdown via the onFailure hook shuffle.reportException(ioe); + return true; } + return false; + } - failedShuffleCounter.increment(1); - checkAndInformAM(failures, srcAttempt, readError, connectError); + private void penalizeHost(MapHost host, int failures) { + host.penalize(); + + String hostPort = host.getHostIdentifier(); + // TODO TEZ-922 hostFailures isn't really used for anything apart from + // hasFailedAcrossNodes().Factor it into error + // reporting / potential blacklisting of hosts. + if (hostFailures.containsKey(hostPort)) { + IntWritable x = hostFailures.get(hostPort); + x.set(x.get() + 1); + } else { + hostFailures.put(hostPort, new IntWritable(1)); + } - checkReducerHealth(); - long delay = (long) (INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, failures)); - - penalties.add(new Penalty(host, delay)); + penalties.add(new Penalty(host, delay)); + } + + private int getFailureCount(InputAttemptIdentifier srcAttempt) { + IntWritable failureCount = failureCounts.get(srcAttempt); + return (failureCount == null) ? 0 : failureCount.get(); + } + + private int incrementAndGetFailureAttempt(InputAttemptIdentifier srcAttempt) { + int failures = 1; + if (failureCounts.containsKey(srcAttempt)) { + IntWritable x = failureCounts.get(srcAttempt); + x.set(x.get() + 1); + failures = x.get(); + } else { + failureCounts.put(srcAttempt, new IntWritable(1)); + } + return failures; } public void reportLocalError(IOException ioe) { - LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", ioe); + LOG.error(srcNameTrimmed + ": " + "Shuffle failed : caused by local error", + ioe); // Shuffle knows how to deal with failures post shutdown via the onFailure hook shuffle.reportException(ioe); } - // Notify the AM - // after every read error, if 'reportReadErrorImmediately' is true or - // after every 'maxFetchFailuresBeforeReporting' failures - private void checkAndInformAM( - int failures, InputAttemptIdentifier srcAttempt, boolean readError, - boolean connectError) { - if ((reportReadErrorImmediately && (readError || connectError)) - || ((failures % maxFetchFailuresBeforeReporting) == 0)) { - LOG.info(srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " - + srcAttempt + " taskAttemptIdentifier: " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber()) + " to AM."); - List<Event> failedEvents = Lists.newArrayListWithCapacity(1); - failedEvents.add(InputReadErrorEvent.create("Fetch failure for " - + TezRuntimeUtils.getTaskAttemptIdentifier( - inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getInputIndex(), - srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier() - .getInputIndex(), srcAttempt.getAttemptNumber())); + // Notify AM + private void informAM(InputAttemptIdentifier srcAttempt) { + LOG.info( + srcNameTrimmed + ": " + "Reporting fetch failure for InputIdentifier: " + + srcAttempt + " taskAttemptIdentifier: " + TezRuntimeUtils + .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + " to AM."); + List<Event> failedEvents = Lists.newArrayListWithCapacity(1); + failedEvents.add(InputReadErrorEvent.create( + "Fetch failure for " + TezRuntimeUtils + .getTaskAttemptIdentifier(inputContext.getSourceVertexName(), + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber()) + " to jobtracker.", + srcAttempt.getInputIdentifier().getInputIndex(), + srcAttempt.getAttemptNumber())); + + inputContext.sendEvents(failedEvents); + } + + /** + * To determine if failures happened across nodes or not. This will help in + * determining whether this task needs to be restarted or source needs to + * be restarted. + * + * @param logContext context info for logging + * @return boolean true indicates this task needs to be restarted + */ + private boolean hasFailedAcrossNodes(String logContext) { + int numUniqueHosts = uniqueHosts.size(); + Preconditions.checkArgument(numUniqueHosts > 0, "No values in unique hosts"); + int threshold = Math.max(3, + (int) Math.ceil(numUniqueHosts * hostFailureFraction)); + int total = 0; + boolean failedAcrossNodes = false; + for(String host : uniqueHosts) { + IntWritable failures = hostFailures.get(host); + if (failures != null && failures.get() > minFailurePerHost) { + total++; + failedAcrossNodes = (total > (threshold * minFailurePerHost)); + if (failedAcrossNodes) { + break; + } + } + } - inputContext.sendEvents(failedEvents); + LOG.info(logContext + ", numUniqueHosts=" + numUniqueHosts + + ", hostFailureThreshold=" + threshold + + ", hostFailuresCount=" + hostFailures.size() + + ", hosts crossing threshold=" + total + + ", reducerFetchIssues=" + failedAcrossNodes + ); + + return failedAcrossNodes; + } + + private boolean allEventsReceived() { + if (!pipelinedShuffleInfoEventsMap.isEmpty()) { + return (pipelinedShuffleInfoEventsMap.size() == numInputs); + } else { + //no pipelining + return ((pathToIdentifierMap.size() + skippedInputCounter.getValue()) + == numInputs); } } - private void checkReducerHealth() { - final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f; - final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f; - final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f; + /** + * Check if consumer needs to be restarted based on total failures w.r.t + * completed outputs and based on number of errors that have happened since + * last successful completion. Consider into account whether failures have + * been seen across different nodes. + * + * @return true to indicate fetchers are healthy + */ + private boolean isFetcherHealthy(String logContext) { long totalFailures = failedShuffleCounter.getValue(); - int doneMaps = numInputs - remainingMaps; - - boolean reducerHealthy = - (((float)totalFailures / (totalFailures + doneMaps)) - < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT); - + int doneMaps = numInputs - remainingMaps.get(); + + boolean fetcherHealthy = true; + if (doneMaps > 0) { + fetcherHealthy = (((float) totalFailures / (totalFailures + doneMaps)) + < maxAllowedFailedFetchFraction); + } + + if (fetcherHealthy) { + //Compute this logic only when all events are received + if (allEventsReceived()) { + if (hostFailureFraction > 0) { + boolean failedAcrossNodes = hasFailedAcrossNodes(logContext); + if (failedAcrossNodes) { + return false; //not healthy + } + } + + if (checkFailedFetchSinceLastCompletion) { + /** + * remainingMaps works better instead of pendingHosts in the + * following condition because of the way the fetcher reports failures + */ + if (failedShufflesSinceLastCompletion >= + remainingMaps.get() * minFailurePerHost) { + /** + * Check if lots of errors are seen after last progress time. + * + * E.g totalFailures = 20. doneMaps = 320 - 300; + * fetcherHealthy = (20/(20+300)) < 0.5. So reducer would be marked as healthy. + * Assume 20 errors happen when downloading the last 20 attempts. Host failure & individual + * attempt failures would keep increasing; but at very slow rate 15 * 180 seconds per + * attempt to find out the issue. + * + * Instead consider the new errors with the pending items to be fetched. + * Assume 21 new errors happened after last progress; remainingMaps = (320-300) = 20; + * (21 / (21 + 20)) > 0.5 + * So we reset the reducer to unhealthy here (special case) + * + * In normal conditions (i.e happy path), this wouldn't even cause any issue as + * failedShufflesSinceLastCompletion is reset as soon as we see successful download. + */ + + fetcherHealthy = + (((float) failedShufflesSinceLastCompletion / ( + failedShufflesSinceLastCompletion + remainingMaps.get())) + < maxAllowedFailedFetchFraction); + + LOG.info(logContext + ", fetcherHealthy=" + fetcherHealthy + + ", failedShufflesSinceLastCompletion=" + + failedShufflesSinceLastCompletion + + ", remainingMaps=" + remainingMaps.get() + ); + } + } + } + } + return fetcherHealthy; + } + + boolean isShuffleHealthy(InputAttemptIdentifier srcAttempt) { + + if (isAbortLimitExceeedFor(srcAttempt)) { + return false; + } + + final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction; + final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction; + + int doneMaps = numInputs - remainingMaps.get(); + + String logContext = "srcAttempt=" + srcAttempt.toString(); + boolean fetcherHealthy = isFetcherHealthy(logContext); + // check if the reducer has progressed enough boolean reducerProgressedEnough = (((float)doneMaps / numInputs) @@ -495,31 +741,31 @@ class ShuffleScheduler { int shuffleProgressDuration = (int)(lastProgressTime - startTime); - // min time the reducer should run without getting killed - int minShuffleRunDuration = - (shuffleProgressDuration > maxMapRuntime) - ? shuffleProgressDuration - : maxMapRuntime; - - boolean reducerStalled = - (((float)stallDuration / minShuffleRunDuration) - >= MAX_ALLOWED_STALL_TIME_PERCENT); + boolean reducerStalled = (shuffleProgressDuration > 0) && + (((float)stallDuration / shuffleProgressDuration) + >= MAX_ALLOWED_STALL_TIME_PERCENT); // kill if not healthy and has insufficient progress if ((failureCounts.size() >= maxFailedUniqueFetches || failureCounts.size() == (numInputs - doneMaps)) - && !reducerHealthy + && !fetcherHealthy && (!reducerProgressedEnough || reducerStalled)) { - LOG.error(srcNameTrimmed + ": " + "Shuffle failed with too many fetch failures " + - "and insufficient progress!" - + "failureCounts=" + failureCounts.size() + ", pendingInputs=" + (numInputs - doneMaps) - + ", reducerHealthy=" + reducerHealthy + ", reducerProgressedEnough=" - + reducerProgressedEnough + ", reducerStalled=" + reducerStalled); - String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out."; + String errorMsg = (srcNameTrimmed + ": " + + "Shuffle failed with too many fetch failures and insufficient progress!" + + "failureCounts=" + failureCounts.size() + + ", pendingInputs=" + (numInputs - doneMaps) + + ", fetcherHealthy=" + fetcherHealthy + + ", reducerProgressedEnough=" + reducerProgressedEnough + + ", reducerStalled=" + reducerStalled); + LOG.error(errorMsg); + if (LOG.isDebugEnabled()) { + LOG.debug("Host failures=" + hostFailures.keySet()); + } // Shuffle knows how to deal with failures post shutdown via the onFailure hook shuffle.reportException(new IOException(errorMsg)); + return false; } - + return true; } public synchronized void addKnownMapOutput(String inputHostName, @@ -528,6 +774,7 @@ class ShuffleScheduler { String hostUrl, InputAttemptIdentifier srcAttempt) { String hostPort = (inputHostName + ":" + String.valueOf(port)); + uniqueHosts.add(hostPort); String identifier = MapHost.createIdentifier(hostPort, partitionId); @@ -545,7 +792,8 @@ class ShuffleScheduler { host.addKnownMap(srcAttempt); pathToIdentifierMap.put( - getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt); + getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), + partitionId), srcAttempt); // Mark the host as pending if (host.getState() == MapHost.State.PENDING) { @@ -557,8 +805,8 @@ class ShuffleScheduler { public synchronized void obsoleteInput(InputAttemptIdentifier srcAttempt) { // The incoming srcAttempt does not contain a path component. LOG.info(srcNameTrimmed + ": " + "Adding obsolete input: " + srcAttempt); - if (shuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { - //Pipelined shuffle case (where shuffleInfoEventsMap gets populated). + if (pipelinedShuffleInfoEventsMap.containsKey(srcAttempt.getInputIdentifier())) { + //Pipelined shuffle case (where pipelinedShuffleInfoEventsMap gets populated). //Fail fast here. shuffle.reportException(new IOException(srcAttempt + " is marked as obsoleteInput, but it " + "exists in shuffleInfoEventMap. Some data could have been already merged " @@ -716,7 +964,7 @@ class ShuffleScheduler { * @return */ public synchronized boolean isDone() { - return remainingMaps == 0; + return remainingMaps.get() == 0; } /** @@ -727,9 +975,9 @@ class ShuffleScheduler { */ public synchronized boolean waitUntilDone(int millis ) throws InterruptedException { - if (remainingMaps > 0) { + if (remainingMaps.get() > 0) { wait(millis); - return remainingMaps == 0; + return remainingMaps.get() == 0; } return true; } @@ -800,12 +1048,6 @@ class ShuffleScheduler { referee.join(); } - public synchronized void informMaxMapRunTime(int duration) { - if (duration > maxMapRuntime) { - maxMapRuntime = duration; - } - } - void setInputFinished(int inputIndex) { synchronized(finishedMaps) { finishedMaps[inputIndex] = true; http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java index fa8630d..02c4176 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java @@ -345,6 +345,20 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); + confKeys.add(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index c33905f..8affa66 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -208,7 +208,7 @@ public class TestFetcher { }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId())); doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), - anyLong(), anyLong(), anyLong(), any(MapOutput.class)); + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); doNothing().when(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); doNothing().when(scheduler).putBackKnownMapOutput(host, @@ -220,8 +220,8 @@ public class TestFetcher { for (int i : sucessfulAttemptsIndexes) { verifyCopySucceeded(scheduler, host, srcAttempts, i); } - verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false); - verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); verify(metrics, times(3)).successFetch(); verify(metrics, times(2)).failedFetch(); @@ -239,7 +239,7 @@ public class TestFetcher { String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent(); ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class); verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100), - eq(p * 1000), anyLong(), captureMapOutput.capture()); + eq(p * 1000), anyLong(), captureMapOutput.capture(), anyBoolean()); // cannot use the equals of MapOutput as it compares id which is private. so doing it manually MapOutput m = captureMapOutput.getAllValues().get(0); @@ -344,7 +344,7 @@ public class TestFetcher { verify(fetcher, times(2)).setupConnection(any(MapHost.class), anyList()); //since copyMapOutput consistently fails, it should call copyFailed once verify(scheduler, times(1)).copyFailed(any(InputAttemptIdentifier.class), any(MapHost.class), - anyBoolean(), anyBoolean()); + anyBoolean(), anyBoolean(), anyBoolean()); verify(fetcher, times(1)).putBackRemainingMapOutputs(any(MapHost.class)); verify(scheduler, times(3)).putBackKnownMapOutput(any(MapHost.class), http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 6c000ff..2c6b37f 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@ -175,7 +175,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); int partitionId = attemptNum; verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id1)); - verify(scheduler).shuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); + verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier()); //Send final_update event. Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1); @@ -185,9 +185,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(Collections.singletonList(dme2)); baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString(); partitionId = attemptNum; - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); + assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(id2)); - assertTrue(scheduler.shuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); + assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier())); MapHost host = scheduler.getHost(); assertTrue(host != null); @@ -195,10 +195,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped { assertTrue(!list.isEmpty()); //Let the final_update event pass MapOutput output = MapOutput.createMemoryMapOutput(id2, mergeManager, 1000, true); - scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id2, host, 1000, 10000, 10000, output, false); assertTrue(!scheduler.isDone()); //we haven't downloaded id1 yet output = MapOutput.createMemoryMapOutput(id1, mergeManager, 1000, true); - scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id1, host, 1000, 10000, 10000, output, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another source yet //Send events for source 2 @@ -214,11 +214,11 @@ public class TestShuffleInputEventHandlerOrderedGrouped { InputAttemptIdentifier id4 = new InputAttemptIdentifier(new InputIdentifier(inputIdx), attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1); assertTrue(!scheduler.isInputFinished(id4.getInputIdentifier().getInputIndex())); - scheduler.copySucceeded(id4, null, 0, 0, 0, null); + scheduler.copySucceeded(id4, null, 0, 0, 0, null, false); assertTrue(!scheduler.isDone()); //we haven't downloaded another id yet //Let the incremental event pass output = MapOutput.createMemoryMapOutput(id3, mergeManager, 1000, true); - scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output); + scheduler.copySucceeded(id3, host, 1000, 10000, 10000, output, false); assertTrue(scheduler.isDone()); } @@ -238,7 +238,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri), eq(id1)); assertTrue("Shuffle info events should not be empty for pipelined shuffle", - !scheduler.shuffleInfoEventsMap.isEmpty()); + !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); //Attempt #0 comes up. When processing this, it should report exception attemptNum = 0; @@ -267,7 +267,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri), eq(expectedIdentifier)); assertTrue("Shuffle info events should be empty for regular shuffle codepath", - scheduler.shuffleInfoEventsMap.isEmpty()); + scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); } @Test(timeout = 5000) @@ -292,7 +292,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), - eq(0l), eq(0l), any(MapOutput.class)); + eq(0l), eq(0l), any(MapOutput.class), eq(true)); } @Test(timeout = 5000) @@ -306,7 +306,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped { handler.handleEvents(events); InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0); verify(scheduler).copySucceeded(eq(expectedIdentifier), any(MapHost.class), eq(0l), - eq(0l), eq(0l), any(MapOutput.class)); + eq(0l), eq(0l), any(MapOutput.class), eq(true)); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/45aa72db/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java index 674d4b4..49b5490 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedGroupedMergedKVInputConfig.java @@ -58,6 +58,23 @@ public class TestOrderedGroupedMergedKVInputConfig { fromConf.set("test.conf.key.1", "confkey1"); fromConf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, 1111); fromConf.set("io.shouldExist", "io"); + fromConf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 3); + fromConf.setFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, + 0.1f); + fromConf.setFloat( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, + 0.6f); + fromConf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, + 10); + fromConf.setFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.2f); + fromConf.setFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f); + fromConf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, false); Map<String, String> additionalConf = new HashMap<String, String>(); additionalConf.put("test.key.2", "key2"); additionalConf.put(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, "3"); @@ -96,6 +113,19 @@ public class TestOrderedGroupedMergedKVInputConfig { assertEquals(0.22f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.0f), 0.001f); assertEquals(0.33f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, 0.0f), 0.001f); assertEquals(0.44f, conf.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.00f), 0.001f); + assertEquals(0.1f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION, 0.00f), 0.001f); + assertEquals(3, conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST, 0), 0); + assertEquals(10, conf.getInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT, 0), 0); + assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION, 0.00f), 0.001f); + assertEquals(0.2f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION, 0.00f), 0.001f); + assertEquals(0.6f, conf.getFloat(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION, 0.6f), 0.001f); + assertEquals(false, conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION, true)); // Verify additional configs assertEquals(false, conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
