Merge remote-tracking branch 'origin/master' into TEZ-3334-MERGE1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/651257fc Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/651257fc Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/651257fc Branch: refs/heads/master Commit: 651257fc69f6af5a58a4589b002ce593b7fa1187 Parents: e1a9c28 68fe023 Author: Jonathan Eagles <[email protected]> Authored: Thu May 4 16:40:16 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu May 4 16:40:16 2017 -0500 ---------------------------------------------------------------------- BUILDING.txt | 21 +- docs/src/site/markdown/by-laws.md | 2 +- docs/src/site/markdown/install.md | 8 +- .../hadoop-shim-2.4/findbugs-exclude.xml | 16 - hadoop-shim-impls/hadoop-shim-2.4/pom.xml | 56 - .../hadoop/shim/HadoopShim23_24Provider.java | 33 - .../apache/tez/hadoop/shim/HadoopShim24.java | 45 - ...rg.apache.tez.hadoop.shim.HadoopShimProvider | 14 - .../shim/TestHadoop23_24ShimProvider.java | 82 - .../hadoop-shim-2.6/findbugs-exclude.xml | 16 - hadoop-shim-impls/hadoop-shim-2.6/pom.xml | 56 - .../hadoop/shim/HadoopShim25_26_27Provider.java | 33 - .../apache/tez/hadoop/shim/HadoopShim26.java | 52 - ...rg.apache.tez.hadoop.shim.HadoopShimProvider | 14 - .../shim/TestHadoop25_26_27ShimProvider.java | 81 - .../hadoop-shim-2.7/findbugs-exclude.xml | 16 + hadoop-shim-impls/hadoop-shim-2.7/pom.xml | 56 + .../hadoop/shim/HadoopShim25_26_27Provider.java | 33 + .../apache/tez/hadoop/shim/HadoopShim27.java | 52 + ...rg.apache.tez.hadoop.shim.HadoopShimProvider | 14 + .../shim/TestHadoop25_26_27ShimProvider.java | 81 + hadoop-shim-impls/pom.xml | 13 +- pom.xml | 4 +- tez-api/pom.xml | 30 +- .../org/apache/tez/client/TezClientUtils.java | 26 + .../org/apache/tez/common/ATSConstants.java | 1 + .../main/java/org/apache/tez/dag/api/DAG.java | 70 +- .../tez/dag/api/EdgeManagerPluginContext.java | 4 + .../apache/tez/dag/api/TezConfiguration.java | 18 + .../tez/dag/api/VertexManagerPluginContext.java | 9 + .../apache/tez/dag/api/client/DAGStatus.java | 4 +- .../apache/tez/dag/api/client/VertexStatus.java | 4 +- .../org/apache/tez/runtime/api/TaskContext.java | 17 +- .../org/apache/tez/dag/api/TestDAGVerify.java | 3 +- .../api/client/TestTimelineReaderFactory.java | 29 - .../event/TestCompositeDataMovementEvent.java | 5 +- .../org/apache/tez/common/AsyncDispatcher.java | 17 +- .../tez/common/AsyncDispatcherConcurrent.java | 12 +- .../org/apache/tez/common/TezExecutors.java | 52 + .../apache/tez/common/TezSharedExecutor.java | 348 ++ .../tez/dag/history/logging/EntityTypes.java | 1 + .../tez/dag/utils/RelocalizationUtils.java | 26 - .../tez/util/TezMxBeanResourceCalculator.java | 4 +- .../tez/common/TestTezSharedExecutor.java | 240 + .../util/TestTezMxBeanResourceCalculator.java | 4 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 13 +- .../apache/tez/dag/app/dag/DAGScheduler.java | 27 +- .../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 12 + .../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 +- .../tez/dag/app/dag/impl/VertexManager.java | 14 + .../serviceplugins/api/TaskCommunicator.java | 14 - .../api/TaskCommunicatorContext.java | 14 - .../api/TaskHeartbeatRequest.java | 14 - .../api/TaskHeartbeatResponse.java | 14 - .../tez/dag/app/dag/impl/TestDAGScheduler.java | 60 +- .../apache/tez/dag/app/dag/impl/TestEdge.java | 32 + .../tez/dag/app/dag/impl/TestTaskImpl.java | 42 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 21 +- .../tez/dag/app/dag/impl/TestVertexManager.java | 26 + tez-dist/pom.xml | 22 +- .../tez/service/impl/ContainerRunnerImpl.java | 15 +- .../apache/tez/service/impl/TezTestService.java | 8 +- .../tez/mapreduce/output/TestMROutput.java | 15 +- .../tez/mapreduce/processor/MapUtils.java | 5 +- .../processor/map/TestMapProcessor.java | 29 +- .../processor/reduce/TestReduceProcessor.java | 7 +- tez-plugins/pom.xml | 12 +- .../org/apache/tez/history/ATSImportTool.java | 19 +- .../logging/ats/TimelineCachePluginImpl.java | 4 +- .../ats/TestTimelineCachePluginImpl.java | 2 + .../ats/ATSV15HistoryLoggingService.java | 11 +- .../ats/TestATSV15HistoryLoggingService.java | 10 +- tez-plugins/tez-yarn-timeline-history/pom.xml | 6 +- .../logging/ats/ATSHistoryLoggingService.java | 8 +- .../ats/HistoryEventTimelineConversion.java | 150 +- .../ats/TestATSHistoryLoggingService.java | 8 +- .../ats/TestHistoryEventTimelineConversion.java | 168 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 14 +- .../runtime/api/impl/TezInputContextImpl.java | 7 +- .../runtime/api/impl/TezOutputContextImpl.java | 7 +- .../api/impl/TezProcessorContextImpl.java | 7 +- .../runtime/api/impl/TezTaskContextImpl.java | 13 +- .../tez/runtime/metrics/TaskCounterUpdater.java | 4 +- .../org/apache/tez/runtime/task/TezChild.java | 6 +- .../apache/tez/runtime/task/TezTaskRunner2.java | 31 +- .../TestLogicalIOProcessorRuntimeTask.java | 12 +- .../runtime/api/impl/TestProcessorContext.java | 14 +- .../tez/runtime/task/TestTaskExecution2.java | 14 +- .../tez/runtime/task/TestTezTaskRunner2.java | 8 +- .../CartesianProductCombination.java | 78 +- .../CartesianProductConfig.java | 60 +- .../CartesianProductEdgeManagerConfig.java | 39 +- .../CartesianProductEdgeManagerPartitioned.java | 8 +- .../CartesianProductEdgeManagerReal.java | 1 - ...artesianProductEdgeManagerUnpartitioned.java | 57 +- .../CartesianProductVertexManager.java | 51 +- .../CartesianProductVertexManagerConfig.java | 40 +- ...artesianProductVertexManagerPartitioned.java | 10 +- ...tesianProductVertexManagerUnpartitioned.java | 316 +- .../library/common/shuffle/FetchResult.java | 17 - .../orderedgrouped/FetcherOrderedGrouped.java | 13 +- .../orderedgrouped/ShuffleClientMetrics.java | 92 - .../orderedgrouped/ShuffleScheduler.java | 89 +- .../writers/UnorderedPartitionedKVWriter.java | 111 +- .../runtime/library/input/UnorderedKVInput.java | 2 +- .../main/proto/CartesianProductPayload.proto | 9 +- .../TestShuffleVertexManagerUtils.java | 5 + .../TestCartesianProductCombination.java | 30 +- .../TestCartesianProductConfig.java | 6 +- .../TestCartesianProductEdgeManager.java | 6 +- .../TestCartesianProductEdgeManagerConfig.java | 15 +- ...tCartesianProductEdgeManagerPartitioned.java | 13 +- ...artesianProductEdgeManagerUnpartitioned.java | 408 +- .../TestCartesianProductVertexManager.java | 19 + ...TestCartesianProductVertexManagerConfig.java | 8 +- ...tesianProductVertexManagerUnpartitioned.java | 565 +- .../shuffle/orderedgrouped/TestFetcher.java | 33 +- .../orderedgrouped/TestMergeManager.java | 26 +- ...tShuffleInputEventHandlerOrderedGrouped.java | 48 +- .../output/TestOnFileUnorderedKVOutput.java | 23 +- tez-tests/pom.xml | 4 + tez-tools/analyzers/pom.xml | 11 +- tez-ui/README.md | 26 +- tez-ui/pom.xml | 67 +- tez-ui/src/main/webapp/.bowerrc | 5 +- tez-ui/src/main/webapp/README.md | 26 +- tez-ui/src/main/webapp/app/adapters/dag-info.js | 22 + .../app/components/home-table-controls.js | 40 + .../main/webapp/app/controllers/dag/vertices.js | 2 +- .../main/webapp/app/controllers/home/index.js | 6 +- tez-ui/src/main/webapp/app/controllers/table.js | 2 +- tez-ui/src/main/webapp/app/entities/entity.js | 10 +- tez-ui/src/main/webapp/app/models/dag-info.js | 28 + tez-ui/src/main/webapp/app/models/dag.js | 30 +- .../src/main/webapp/app/routes/dag/counters.js | 3 + .../src/main/webapp/app/routes/dag/graphical.js | 3 + tez-ui/src/main/webapp/app/routes/dag/index.js | 3 + .../src/main/webapp/app/routes/dag/swimlane.js | 3 + tez-ui/src/main/webapp/app/routes/home/index.js | 18 + .../src/main/webapp/app/serializers/dag-info.js | 60 + tez-ui/src/main/webapp/app/serializers/dag.js | 47 +- tez-ui/src/main/webapp/app/styles/app.less | 1 + .../webapp/app/styles/home-table-controls.less | 22 + .../webapp/app/styles/queries-page-search.less | 1 + tez-ui/src/main/webapp/app/styles/shared.less | 6 + .../components/home-table-controls.hbs | 24 + .../main/webapp/app/templates/home/index.hbs | 1 + .../main/webapp/app/utils/download-dag-zip.js | 9 + tez-ui/src/main/webapp/bower-shrinkwrap.json | 71 + .../src/main/webapp/config/default-app-conf.js | 3 +- tez-ui/src/main/webapp/package.json | 6 +- .../components/home-table-controls-test.js | 80 + .../webapp/tests/unit/adapters/dag-info-test.js | 30 + .../tests/unit/controllers/home/index-test.js | 2 +- .../webapp/tests/unit/models/dag-info-test.js | 35 + .../main/webapp/tests/unit/models/dag-test.js | 44 + .../webapp/tests/unit/routes/home/index-test.js | 24 + .../tests/unit/serializers/dag-info-test.js | 114 + .../webapp/tests/unit/serializers/dag-test.js | 26 +- .../tests/unit/serializers/timeline-test.js | 2 + tez-ui/src/main/webapp/yarn.lock | 4945 ++++++++++++++++++ 163 files changed, 8752 insertions(+), 1988 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index 76b83f5,0307a04..7f3ad2e --- a/pom.xml +++ b/pom.xml @@@ -37,9 -37,8 +37,9 @@@ <properties> <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile> <clover.license>${user.home}/clover.license</clover.license> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.7.0</hadoop.version> <jetty.version>6.1.26</jetty.version> + <netty.version>3.6.2.Final</netty.version> <pig.version>0.13.0</pig.version> <javac.version>1.7</javac.version> <slf4j.version>1.7.10</slf4j.version> @@@ -56,10 -55,9 +56,10 @@@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scm.url>scm:git:https://git-wip-us.apache.org/repos/asf/tez.git</scm.url> <build.time>${maven.build.timestamp}</build.time> - <frontend-maven-plugin.version>1.1</frontend-maven-plugin.version> + <frontend-maven-plugin.version>1.2</frontend-maven-plugin.version> <findbugs-maven-plugin.version>3.0.1</findbugs-maven-plugin.version> <javadoc-maven-plugin.version>2.9.1</javadoc-maven-plugin.version> + <shade-maven-plugin.version>2.4.3</shade-maven-plugin.version> </properties> <scm> <connection>${scm.url}</connection> http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java ---------------------------------------------------------------------- diff --cc tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java index 8d9436e,17eb88c..85e9227 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java @@@ -23,7 -23,8 +23,9 @@@ import com.google.common.base.Precondit import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.dag.api.TezConfiguration; + import org.apache.tez.common.TezExecutors; + import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.TezException; import org.apache.tez.service.ContainerRunner; import org.apache.tez.shufflehandler.ShuffleHandler; http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --cc tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 9c115b9,eb30841..bd83dfa --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@@ -171,11 -159,10 +159,12 @@@ public class TestMapProcessor task.initialize(); task.run(); task.close(); - + sharedExecutor.shutdownNow(); + OutputContext outputContext = task.getOutputContexts().iterator().next(); - TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier()); + TezTaskOutput mapOutputs = new TezTaskOutputFiles( + jobConf, outputContext.getUniqueIdentifier(), + outputContext.getDagIdentifier()); // TODO NEWTEZ FIXME OutputCommitter verification http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-plugins/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 4b7c7fd,5cad6fc..b762c75 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@@ -131,15 -125,12 +129,14 @@@ class FetcherOrderedGrouped extends Cal int dagId, boolean asyncHttp, boolean sslShuffle, - boolean verifyDiskChecksum) { + boolean verifyDiskChecksum, + boolean compositeFetch) { this.scheduler = scheduler; this.allocator = allocator; - this.metrics = metrics; this.exceptionReporter = exceptionReporter; this.mapHost = mapHost; - this.currentPartition = this.mapHost.getPartitionId(); + this.minPartition = this.mapHost.getPartitionId(); + this.maxPartition = this.minPartition + this.mapHost.getPartitionCount() - 1; this.id = nextId.incrementAndGet(); this.jobTokenSecretManager = jobTokenSecretMgr; @@@ -533,47 -458,68 +527,45 @@@ } return EMPTY_ATTEMPT_ID_ARRAY; } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); - } - // Get the location for the map output - either in-memory or on-disk - try { - mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id); - } catch (IOException e) { - if (!stopped) { - // Kill the reduce attempt - ioErrs.increment(1); - scheduler.reportLocalError(e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring error from merger.reserve"); - } + // Check if we can shuffle *now* ... + if (mapOutput.getType() == Type.WAIT) { + LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Check if we can shuffle *now* ... - if (mapOutput.getType() == Type.WAIT) { - LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - //Not an error but wait to process data. - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Go! - if (LOG.isDebugEnabled()) { - LOG.debug("fetcher#" + id + " about to shuffle output of map " + - mapOutput.getAttemptIdentifier() + " decomp: " + - decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); - } - if (mapOutput.getType() == Type.MEMORY) { - ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, - (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, - ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier()); - } else if (mapOutput.getType() == Type.DISK) { - ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, decompressedLength, LOG, - mapOutput.getAttemptIdentifier(), - ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); - } else { - throw new IOException("Unknown mapOutput type while fetching shuffle data:" + - mapOutput.getType()); - } + // Go! + if (LOG.isDebugEnabled()) { + LOG.debug("fetcher#" + id + " about to shuffle output of map " + + mapOutput.getAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); + } - // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); - // Reset retryStartTime as map task make progress if retried before. - retryStartTime = 0; + if (mapOutput.getType() == Type.MEMORY) { + ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, + (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, + ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier()); + } else if (mapOutput.getType() == Type.DISK) { + ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), + input, compressedLength, decompressedLength, LOG, + mapOutput.getAttemptIdentifier(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); + } else { + throw new IOException("Unknown mapOutput type while fetching shuffle data:" + + mapOutput.getType()); + } - scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, - endTime - startTime, mapOutput, false); - // Note successful shuffle - remaining.remove(srcAttemptId.toString()); - return null; - } catch (IOException ioe) { + // Inform the shuffle scheduler + long endTime = System.currentTimeMillis(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + + scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, + endTime - startTime, mapOutput, false); - // Note successful shuffle - metrics.successFetch(); + } + remaining.remove(inputAttemptIdentifier.toString()); + } catch(IOException ioe) { if (stopped) { if (LOG.isDebugEnabled()) { LOG.debug("Not reporting fetch failure for exception during data copy: [" @@@ -609,10 -555,8 +601,9 @@@ // Inform the shuffle-scheduler mapOutput.abort(); - metrics.failedFetch(); - return new InputAttemptIdentifier[]{srcAttemptId}; + return new InputAttemptIdentifier[] {srcAttemptId}; } + return null; } /** @@@ -713,47 -665,35 +704,45 @@@ } InputAttemptIdentifier srcAttemptId = iter.next(); MapOutput mapOutput = null; - try { - long startTime = System.currentTimeMillis(); - Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); - - TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), - currentPartition); - - mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); - long endTime = System.currentTimeMillis(); - scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), - indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); - iter.remove(); - } catch (IOException e) { - if (mapOutput != null) { - mapOutput.abort(); - } - if (!stopped) { - ioErrs.increment(1); - scheduler.copyFailed(srcAttemptId, host, true, false, true); - LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + - host.getHostIdentifier(), e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Ignoring fetch error during local disk copy since fetcher has already been stopped"); + boolean hasFailures = false; + // Fetch partition count number of map outputs (handles auto-reduce case) + for (int curPartition = minPartition; curPartition <= maxPartition; curPartition++) { + try { + long startTime = System.currentTimeMillis(); + + // Partition id is the base partition id plus the relative offset + int reduceId = host.getPartitionId() + curPartition - minPartition; + srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(), reduceId); + Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null); + TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(), reduceId); + + mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord); + long endTime = System.currentTimeMillis(); + scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), + indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); - metrics.successFetch(); + } catch (IOException e) { + if (mapOutput != null) { + mapOutput.abort(); } - return; + if (!stopped) { + hasFailures = true; - metrics.failedFetch(); + ioErrs.increment(1); + scheduler.copyFailed(srcAttemptId, host, true, false, true); + LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + + host.getHostIdentifier(), e); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Ignoring fetch error during local disk copy since fetcher has already been stopped"); + } + return; + } + } } + if (!hasFailures) { + iter.remove(); + } } } finally { putBackRemainingMapOutputs(host); http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 02bda68,73a6214..33fb0f4 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@@ -54,8 -55,6 +55,7 @@@ import com.google.common.util.concurren import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.compress.CompressionCodec; - import org.apache.hadoop.security.UserGroupInformation; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.HttpConnectionParams; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.security.JobTokenSecretManager; @@@ -375,12 -369,9 +374,9 @@@ class ShuffleScheduler TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(), - inputContext.getTaskVertexName(), inputContext.getTaskIndex(), - this.conf, UserGroupInformation.getCurrentUser().getShortUserName()); SecretKey jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext - .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); + .getServiceConsumerMetaData(auxiliaryService)); this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret); ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers, @@@ -413,9 -404,8 +409,9 @@@ this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS); this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED); this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED); + this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf); - pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>(); + pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap(); LOG.info("ShuffleScheduler running for sourceVertex: " + inputContext.getSourceVertexName() + " with configuration: " + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting @@@ -1110,18 -1137,12 +1144,18 @@@ public InputAttemptIdentifier getIdentifierForFetchedOutput( String path, int reduceId) { - return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId)); + return pathToIdentifierMap.get(new PathPartition(path, reduceId)); } - private boolean inputShouldBeConsumed(InputAttemptIdentifier id) { + private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) { - return (!obsoleteInputs.contains(id) && - !isInputFinished(id.getInputIdentifier())); + boolean isInputFinished = false; + if (id instanceof CompositeInputAttemptIdentifier) { + CompositeInputAttemptIdentifier cid = (CompositeInputAttemptIdentifier)id; + isInputFinished = isInputFinished(cid.getInputIdentifier(), cid.getInputIdentifier() + cid.getInputIdentifierCount()); + } else { + isInputFinished = isInputFinished(id.getInputIdentifier()); + } + return !obsoleteInputs.contains(id) && !isInputFinished; } public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) { http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index dfa473b,a9b57a9..ef371c2 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@@ -154,9 -149,9 +152,9 @@@ public class TestFetcher final boolean ENABLE_LOCAL_FETCH = true; final boolean DISABLE_LOCAL_FETCH = false; - MapHost mapHost = new MapHost(HOST, PORT, 0); + MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@@ -172,9 -167,9 +170,9 @@@ verify(spyFetcher, never()).copyFromHost(any(MapHost.class)); // if hostname does not match use http - mapHost = new MapHost(HOST + "_OTHER", PORT, 0); + mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@@ -188,9 -183,9 +186,9 @@@ verify(spyFetcher, times(1)).copyFromHost(mapHost); // if port does not match use http - mapHost = new MapHost(HOST, PORT + 1, 0); + mapHost = new MapHost(HOST, PORT + 1, 0, 1); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@@ -204,8 -199,8 +202,8 @@@ verify(spyFetcher, times(1)).copyFromHost(mapHost); //if local fetch is not enabled - mapHost = new MapHost(HOST, PORT, 0); + mapHost = new MapHost(HOST, PORT, 0, 1); - fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@@ -230,11 -224,11 +227,11 @@@ when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); - MapHost host = new MapHost(HOST, PORT, 1); + MapHost host = new MapHost(HOST, PORT, 1, 1); - FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); FetcherOrderedGrouped spyFetcher = spy(fetcher); @@@ -318,151 -292,21 +315,144 @@@ // should have exactly 3 success and 1 failure. for (int i : sucessfulAttemptsIndexes) { - verifyCopySucceeded(scheduler, host, srcAttempts, i); + for (int j = 0; j < host.getPartitionCount(); j++) { + verifyCopySucceeded(scheduler, host, srcAttempts, i, j); + } } - verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); - verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); - verify(metrics, times(3)).successFetch(); - verify(metrics, times(2)).failedFetch(); - verify(spyFetcher).putBackRemainingMapOutputs(host); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); } + @Test(timeout = 5000) + public void testSetupLocalDiskFetchAutoReduce() throws Exception { + Configuration conf = new TezConfiguration(); + ShuffleScheduler scheduler = mock(ShuffleScheduler.class); + MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + Shuffle shuffle = mock(Shuffle.class); + InputContext inputContext = mock(InputContext.class); + when(inputContext.getCounters()).thenReturn(new TezCounters()); + when(inputContext.getSourceVertexName()).thenReturn(""); + + MapHost host = new MapHost(HOST, PORT, 1, 2); - FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, ++ FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, + wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, + false, false, true, false); + FetcherOrderedGrouped spyFetcher = spy(fetcher); + + + final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList( + new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3", host.getPartitionCount()), + new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4", host.getPartitionCount()) + ); + final int FIRST_FAILED_ATTEMPT_IDX = 2; + final int SECOND_FAILED_ATTEMPT_IDX = 4; + final int[] sucessfulAttemptsIndexes = { 0, 1, 3 }; + + doReturn(srcAttempts).when(scheduler).getMapsForHost(host); + final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap + = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>(); + for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) { + for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) { + ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(), host.getPartitionId() + i); + pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i)); + } + } + + doAnswer(new Answer<InputAttemptIdentifier>() { + @Override + public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String path = (String) args[0]; + int reduceId = (int) args[1]; + return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId)); + } + }).when(scheduler) + .getIdentifierForFetchedOutput(any(String.class), any(int.class)); + + doAnswer(new Answer<MapOutput>() { + @Override + public MapOutput answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + MapOutput mapOutput = mock(MapOutput.class); + doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType(); + doReturn(args[0]).when(mapOutput).getAttemptIdentifier(); + return mapOutput; + } + }).when(spyFetcher) + .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class), + any(TezIndexRecord.class)); + + doAnswer(new Answer<Path>() { + @Override + public Path answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]); + } + }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString()); + + for (int i = 0; i < host.getPartitionCount(); i++) { + doAnswer(new Answer<TezIndexRecord>() { + @Override + public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + String pathComponent = (String) args[0]; + int len = pathComponent.length(); + long p = Long.valueOf(pathComponent.substring(len - 1, len)); + + if (pathComponent.equals(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).getPathComponent()) || + pathComponent.equals(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).getPathComponent())) { + throw new IOException("Thowing exception to simulate failure case"); + } + // match with params for copySucceeded below. + return new TezIndexRecord(p * 10, p * 1000, p * 100); + } + }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i)); + } + + doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class), + anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean()); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0)); + doNothing().when(scheduler).putBackKnownMapOutput(host, + srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1)); + + spyFetcher.setupLocalDiskFetch(host); + + // should have exactly 3 success and 1 failure. + for (int i : sucessfulAttemptsIndexes) { + for (int j = 0; j < host.getPartitionCount(); j++) { + verifyCopySucceeded(scheduler, host, srcAttempts, i, j); + } + } + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host, true, false, true); + verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host, true, false, true); + - verify(metrics, times(6)).successFetch(); - verify(metrics, times(4)).failedFetch(); - + verify(spyFetcher).putBackRemainingMapOutputs(host); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); + verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); + } + private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host, - List<InputAttemptIdentifier> srcAttempts, long p) throws + List<CompositeInputAttemptIdentifier> srcAttempts, long p, int j) throws IOException { // need to verify filename, offsets, sizes wherever they are used. - InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p); + InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j); String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent(); ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class); verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100), @@@ -522,11 -365,11 +511,11 @@@ when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(HOST, PORT, 1); + final MapHost host = new MapHost(HOST, PORT, 1, 1); - FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, - false, false, true); + false, false, true, false); final FetcherOrderedGrouped fetcher = spy(mockFetcher); @@@ -611,9 -453,9 +599,9 @@@ doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class)); HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - final MapHost host = new MapHost(HOST, PORT, 1); + final MapHost host = new MapHost(HOST, PORT, 1, 1); FetcherOrderedGrouped mockFetcher = - new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr, + new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, @@@ -678,11 -520,10 +666,10 @@@ Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); - MapHost mapHost = new MapHost(HOST, PORT, 0); + MapHost mapHost = new MapHost(HOST, PORT, 0, 1); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java index 72cba80,695a307..ff5ceab --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java @@@ -240,10 -245,43 +247,43 @@@ public class TestShuffleInputEventHandl Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); handler.handleEvents(Collections.singletonList(dme2)); - InputAttemptIdentifier id2 = - new InputAttemptIdentifier(inputIdx, attemptNum, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); - verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class)); + // task should issue kill request + verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class)); + } + + @Test (timeout = 5000) + public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException { + //Process attempt #1 first + int attemptNum = 1; + int inputIdx = 1; + + Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum); + handler.handleEvents(Collections.singletonList(dme1)); + - InputAttemptIdentifier id1 = - new InputAttemptIdentifier(inputIdx, attemptNum, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0); ++ CompositeInputAttemptIdentifier id1 = ++ new CompositeInputAttemptIdentifier(inputIdx, attemptNum, ++ PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1); + + verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1)); + assertTrue("Shuffle info events should not be empty for pipelined shuffle", + !scheduler.pipelinedShuffleInfoEventsMap.isEmpty()); + + int valuesInMapLocations = scheduler.mapLocations.values().size(); + assertTrue("Maplocations should have values. current size: " + valuesInMapLocations, + valuesInMapLocations > 0); + + // start scheduling for download. Sets up scheduledForDownload in eventInfo. + scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next()); + + // send input failed event. + List<Event> events = new LinkedList<Event>(); + int targetIdx = 1; + InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0); + events.add(failedEvent); + handler.handleEvents(events); + + // task should issue kill request, as inputs are scheduled for download already. + verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class)); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ----------------------------------------------------------------------
