Merge branch 'master' into TEZ-3334-MERGE1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be0d01f6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be0d01f6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be0d01f6 Branch: refs/heads/master Commit: be0d01f6add2bbb20af73ae44909f2510e37eff9 Parents: c3a7c21 e375b9d Author: Jonathan Eagles <[email protected]> Authored: Mon Mar 27 17:30:53 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Mon Mar 27 17:30:53 2017 -0500 ---------------------------------------------------------------------- BUILDING.txt | 5 + CHANGES.txt | 2199 ------------------ Tez_DOAP.rdf | 7 + docs/pom.xml | 18 + docs/src/site/markdown/install.md | 2 +- .../site/markdown/releases/apache-tez-0-8-5.md | 30 + docs/src/site/markdown/releases/index.md | 1 + docs/src/site/site.xml | 2 +- .../org/apache/tez/client/FrameworkClient.java | 2 + .../java/org/apache/tez/client/TezClient.java | 34 +- .../org/apache/tez/client/TezClientUtils.java | 18 +- .../org/apache/tez/client/TezYarnClient.java | 9 + .../org/apache/tez/common/ATSConstants.java | 1 + .../org/apache/tez/common/TezYARNUtils.java | 32 +- .../apache/tez/common/security/TokenCache.java | 16 +- .../main/java/org/apache/tez/dag/api/DAG.java | 15 + .../org/apache/tez/dag/api/HistoryLogLevel.java | 1 + .../apache/tez/dag/api/TezConfiguration.java | 82 +- .../org/apache/tez/dag/api/TezConstants.java | 6 + .../apache/tez/dag/api/client/DAGClient.java | 13 + .../tez/dag/api/client/DAGClientImpl.java | 33 +- .../tez/dag/api/client/DAGClientInternal.java | 14 +- .../dag/api/client/DAGClientTimelineImpl.java | 11 +- .../dag/api/client/rpc/DAGClientRPCImpl.java | 23 +- .../api/TaskSchedulerContext.java | 3 +- .../org/apache/tez/client/TestTezClient.java | 53 +- .../apache/tez/client/TestTezClientUtils.java | 41 + .../org/apache/tez/common/TestTezYARNUtils.java | 52 + .../tez/common/security/TestTokenCache.java | 57 +- .../org/apache/tez/dag/api/TestDAGVerify.java | 25 + .../tez/dag/api/client/rpc/TestDAGClient.java | 25 +- .../org/apache/tez/common/TezUtilsInternal.java | 40 + .../org/apache/tez/dag/records/TezDAGID.java | 64 +- .../java/org/apache/tez/dag/records/TezID.java | 21 + .../tez/dag/records/TezTaskAttemptID.java | 57 +- .../org/apache/tez/dag/records/TezTaskID.java | 51 +- .../org/apache/tez/dag/records/TezVertexID.java | 48 +- .../org/apache/tez/util/FastNumberFormat.java | 55 + .../org/apache/tez/util/TestNumberFormat.java | 39 + .../java/org/apache/tez/client/LocalClient.java | 7 +- .../java/org/apache/tez/dag/app/AppContext.java | 5 + .../org/apache/tez/dag/app/DAGAppMaster.java | 62 +- .../dag/app/TaskCommunicatorContextImpl.java | 25 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 3 +- .../dag/app/rm/TaskSchedulerContextImpl.java | 7 +- .../app/rm/TaskSchedulerContextImplWrapper.java | 11 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 18 +- .../dag/app/rm/YarnTaskSchedulerService.java | 3 +- .../tez/dag/app/rm/container/AMContainer.java | 1 + .../app/rm/container/AMContainerHelpers.java | 18 +- .../dag/app/rm/container/AMContainerImpl.java | 67 +- .../dag/app/rm/container/AMContainerMap.java | 40 +- .../org/apache/tez/dag/app/rm/node/AMNode.java | 3 + .../rm/node/AMNodeEventContainerCompleted.java | 37 + .../tez/dag/app/rm/node/AMNodeEventType.java | 5 +- .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 67 +- .../tez/dag/app/rm/node/AMNodeTracker.java | 5 +- .../dag/app/rm/node/PerSourceNodeTracker.java | 11 +- .../apache/tez/dag/app/web/AMWebController.java | 32 +- .../tez/dag/history/HistoryEventHandler.java | 137 +- .../tez/dag/history/HistoryEventType.java | 4 +- .../dag/history/events/DAGSubmittedEvent.java | 17 +- .../impl/HistoryEventJsonConversion.java | 6 + tez-dag/src/main/proto/HistoryEvents.proto | 1 + .../resources/tez-container-log4j.properties | 8 + .../apache/tez/dag/app/MockDAGAppMaster.java | 2 +- .../apache/tez/dag/app/TestDAGAppMaster.java | 8 +- .../apache/tez/dag/app/TestRecoveryParser.java | 26 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 76 + .../tez/dag/app/dag/impl/TestVertexImpl.java | 65 + .../tez/dag/app/rm/TestTaskScheduler.java | 3 +- .../dag/app/rm/TestTaskSchedulerHelpers.java | 4 +- .../dag/app/rm/TestTaskSchedulerManager.java | 42 +- .../dag/app/rm/container/TestAMContainer.java | 136 +- .../app/rm/container/TestAMContainerMap.java | 126 +- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 73 + .../tez/dag/app/web/TestAMWebController.java | 38 + .../dag/history/TestHistoryEventHandler.java | 81 +- .../TestHistoryEventsProtoConversion.java | 4 +- .../impl/TestHistoryEventJsonConversion.java | 14 +- .../history/recovery/TestRecoveryService.java | 2 +- .../org/apache/tez/examples/ExampleDriver.java | 2 + tez-ext-service-tests/pom.xml | 5 + .../apache/tez/dag/api/client/MRDAGClient.java | 10 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 2 + .../apache/tez/mapreduce/processor/MRTask.java | 22 - .../common/TestMRInputAMSplitGenerator.java | 58 +- .../logging/ats/TimelineCachePluginImpl.java | 1 - .../ats/TestTimelineCachePluginImpl.java | 41 +- .../ats/acls/TestATSHistoryWithACLs.java | 4 +- .../ats/TestATSV15HistoryLoggingService.java | 146 +- .../ats/HistoryEventTimelineConversion.java | 6 + .../ats/TestATSHistoryLoggingService.java | 2 +- .../ats/TestHistoryEventTimelineConversion.java | 15 +- .../common/resources/MemoryDistributor.java | 12 +- .../apache/tez/runtime/task/TaskReporter.java | 15 +- .../org/apache/tez/runtime/task/TezChild.java | 2 + .../tez/runtime/task/TestTaskExecution2.java | 42 +- .../org/apache/tez/http/HttpConnection.java | 18 +- .../java/org/apache/tez/http/SSLFactory.java | 2 +- .../http/async/netty/AsyncHttpConnection.java | 1 + .../CartesianProductCombination.java | 3 + .../CartesianProductConfig.java | 10 + .../CartesianProductEdgeManagerConfig.java | 12 +- ...artesianProductEdgeManagerUnpartitioned.java | 60 +- .../CartesianProductVertexManager.java | 28 +- .../CartesianProductVertexManagerConfig.java | 20 +- ...artesianProductVertexManagerPartitioned.java | 4 + .../CartesianProductVertexManagerReal.java | 3 +- ...tesianProductVertexManagerUnpartitioned.java | 117 +- .../runtime/library/common/shuffle/Fetcher.java | 4 +- .../library/common/shuffle/ShuffleUtils.java | 149 +- .../common/shuffle/impl/ShuffleManager.java | 7 +- .../orderedgrouped/FetcherOrderedGrouped.java | 4 +- .../shuffle/orderedgrouped/MergeManager.java | 213 +- .../orderedgrouped/ShuffleScheduler.java | 8 +- .../runtime/library/common/sort/impl/IFile.java | 6 +- .../common/sort/impl/PipelinedSorter.java | 29 +- .../library/common/sort/impl/TezMerger.java | 56 +- .../WeightedScalingMemoryDistributor.java | 62 +- .../tez/runtime/library/utils/Grouper.java | 89 + .../main/proto/CartesianProductPayload.proto | 5 +- .../TestWeightedScalingMemoryDistributor.java | 165 ++ .../TestCartesianProductCombination.java | 9 + .../TestCartesianProductConfig.java | 34 +- .../TestCartesianProductEdgeManager.java | 13 +- .../TestCartesianProductEdgeManagerConfig.java | 50 + ...tCartesianProductEdgeManagerPartitioned.java | 6 +- ...artesianProductEdgeManagerUnpartitioned.java | 125 +- ...TestCartesianProductVertexManagerConfig.java | 53 + ...artesianProductVertexManagerPartitioned.java | 9 +- ...tesianProductVertexManagerUnpartitioned.java | 141 +- .../library/cartesianproduct/TestGrouper.java | 80 + .../common/shuffle/TestShuffleUtils.java | 40 +- .../orderedgrouped/TestMergeManager.java | 98 +- .../common/sort/impl/TestPipelinedSorter.java | 18 +- .../org/apache/tez/mapreduce/TestMRRJobs.java | 2 + .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 1 + .../org/apache/tez/test/TestAMRecovery.java | 1 + .../org/apache/tez/test/TestDAGRecovery.java | 1 + .../org/apache/tez/test/TestDAGRecovery2.java | 1 + .../tez/test/TestExceptionPropagation.java | 1 + .../org/apache/tez/test/TestFaultTolerance.java | 50 +- .../apache/tez/test/TestPipelinedShuffle.java | 2 + .../java/org/apache/tez/test/TestRecovery.java | 1 + .../org/apache/tez/test/TestSecureShuffle.java | 3 + .../tez/test/TestTaskErrorsUsingLocalMode.java | 3 +- .../java/org/apache/tez/test/TestTezJobs.java | 3 +- tez-ui/pom.xml | 10 + tez-ui/src/main/webapp/app/adapters/timeline.js | 11 +- .../main/webapp/app/components/caller-info.js | 2 + .../webapp/app/components/dags-page-search.js | 2 + .../webapp/app/components/dags-pagination-ui.js | 106 - .../webapp/app/components/date-formatter.js | 8 + .../components/em-table-tasks-log-link-cell.js | 33 + .../main/webapp/app/components/em-tooltip.js | 8 +- .../main/webapp/app/components/pagination-ui.js | 106 + .../app/components/queries-page-search.js | 61 + .../webapp/app/components/query-timeline.js | 79 + .../webapp/app/components/zip-download-modal.js | 2 +- .../main/webapp/app/controllers/application.js | 4 +- .../main/webapp/app/controllers/dag/attempts.js | 20 + .../webapp/app/controllers/dag/graphical.js | 4 + .../main/webapp/app/controllers/dag/index.js | 5 + .../main/webapp/app/controllers/dag/swimlane.js | 16 +- .../main/webapp/app/controllers/dag/tasks.js | 13 + tez-ui/src/main/webapp/app/controllers/dags.js | 166 -- tez-ui/src/main/webapp/app/controllers/home.js | 31 + .../main/webapp/app/controllers/home/index.js | 205 ++ .../main/webapp/app/controllers/home/queries.js | 206 ++ tez-ui/src/main/webapp/app/controllers/query.js | 44 + .../webapp/app/controllers/query/configs.js | 60 + .../main/webapp/app/controllers/query/index.js | 22 + .../webapp/app/controllers/query/timeline.js | 57 + tez-ui/src/main/webapp/app/controllers/table.js | 4 +- .../webapp/app/controllers/task/attempts.js | 20 + .../webapp/app/controllers/vertex/attempts.js | 20 + .../main/webapp/app/controllers/vertex/tasks.js | 13 + tez-ui/src/main/webapp/app/entities/entity.js | 71 +- tez-ui/src/main/webapp/app/initializers/env.js | 1 + .../src/main/webapp/app/initializers/jquery.js | 1 - tez-ui/src/main/webapp/app/models/abstract.js | 1 + tez-ui/src/main/webapp/app/models/ahs-app.js | 13 +- tez-ui/src/main/webapp/app/models/attempt.js | 19 +- tez-ui/src/main/webapp/app/models/dag.js | 10 +- tez-ui/src/main/webapp/app/models/hive-query.js | 51 + tez-ui/src/main/webapp/app/models/task.js | 3 + tez-ui/src/main/webapp/app/models/timed.js | 43 + tez-ui/src/main/webapp/app/models/timeline.js | 11 +- tez-ui/src/main/webapp/app/models/vertex.js | 7 + tez-ui/src/main/webapp/app/router.js | 16 +- .../src/main/webapp/app/routes/application.js | 4 +- tez-ui/src/main/webapp/app/routes/dag/tasks.js | 20 + tez-ui/src/main/webapp/app/routes/dags.js | 172 -- tez-ui/src/main/webapp/app/routes/home.js | 29 + tez-ui/src/main/webapp/app/routes/home/index.js | 108 + .../src/main/webapp/app/routes/home/queries.js | 105 + .../main/webapp/app/routes/multi-am-pollster.js | 7 +- tez-ui/src/main/webapp/app/routes/query.js | 38 + .../src/main/webapp/app/routes/query/configs.js | 38 + .../src/main/webapp/app/routes/query/index.js | 35 + .../main/webapp/app/routes/query/timeline.js | 35 + .../main/webapp/app/routes/server-side-ops.js | 107 + .../webapp/app/routes/single-am-pollster.js | 7 +- .../src/main/webapp/app/routes/vertex/tasks.js | 20 + .../src/main/webapp/app/serializers/attempt.js | 12 +- tez-ui/src/main/webapp/app/serializers/dag.js | 4 +- .../main/webapp/app/serializers/hive-query.js | 85 +- tez-ui/src/main/webapp/app/serializers/task.js | 5 +- tez-ui/src/main/webapp/app/styles/app.less | 2 + .../webapp/app/styles/dags-page-search.less | 4 +- .../main/webapp/app/styles/details-page.less | 6 + .../webapp/app/styles/queries-page-search.less | 78 + .../main/webapp/app/styles/query-timeline.less | 159 ++ .../webapp/app/styles/zip-download-modal.less | 13 +- .../main/webapp/app/templates/attempt/index.hbs | 16 + .../app/templates/components/caller-info.hbs | 6 +- .../templates/components/dags-page-search.hbs | 8 + .../templates/components/dags-pagination-ui.hbs | 48 - .../app/templates/components/date-formatter.hbs | 2 +- .../components/em-table-tasks-log-link-cell.hbs | 25 + .../app/templates/components/em-tooltip.hbs | 18 +- .../app/templates/components/pagination-ui.hbs | 48 + .../components/queries-page-search.hbs | 88 + .../app/templates/components/query-timeline.hbs | 124 + .../templates/components/zip-download-modal.hbs | 12 +- .../src/main/webapp/app/templates/dag/index.hbs | 6 +- tez-ui/src/main/webapp/app/templates/dags.hbs | 42 - tez-ui/src/main/webapp/app/templates/home.hbs | 20 + .../main/webapp/app/templates/home/index.hbs | 46 + .../main/webapp/app/templates/home/queries.hbs | 41 + tez-ui/src/main/webapp/app/templates/query.hbs | 20 + .../main/webapp/app/templates/query/configs.hbs | 34 + .../main/webapp/app/templates/query/index.hbs | 128 + .../webapp/app/templates/query/timeline.hbs | 43 + .../main/webapp/app/templates/task/index.hbs | 25 + .../main/webapp/app/templates/vertex/index.hbs | 11 + .../main/webapp/app/utils/download-dag-zip.js | 75 +- .../src/main/webapp/app/utils/vertex-process.js | 13 +- .../src/main/webapp/app/utils/virtual-anchor.js | 32 + tez-ui/src/main/webapp/package.json | 6 +- .../components/dags-page-search-test.js | 4 +- .../components/dags-pagination-ui-test.js | 158 -- .../components/date-formatter-test.js | 11 + .../em-table-tasks-log-link-cell-test.js | 53 + .../components/pagination-ui-test.js | 158 ++ .../components/queries-page-search-test.js | 76 + .../components/query-timeline-test.js | 182 ++ .../components/zip-download-modal-test.js | 52 +- .../webapp/tests/unit/adapters/timeline-test.js | 5 + .../tests/unit/controllers/application-test.js | 10 +- .../tests/unit/controllers/dag/attempts-test.js | 2 +- .../unit/controllers/dag/graphical-test.js | 1 + .../tests/unit/controllers/dag/index-test.js | 1 + .../tests/unit/controllers/dag/swimlane-test.js | 1 + .../tests/unit/controllers/dag/tasks-test.js | 28 + .../webapp/tests/unit/controllers/dags-test.js | 52 - .../webapp/tests/unit/controllers/home-test.js | 46 + .../tests/unit/controllers/home/index-test.js | 143 ++ .../tests/unit/controllers/home/queries-test.js | 148 ++ .../webapp/tests/unit/controllers/query-test.js | 51 + .../unit/controllers/query/configs-test.js | 55 + .../tests/unit/controllers/query/index-test.js | 33 + .../unit/controllers/query/timeline-test.js | 63 + .../webapp/tests/unit/controllers/table-test.js | 39 + .../unit/controllers/task/attempts-test.js | 2 +- .../unit/controllers/vertex/attempts-test.js | 2 +- .../tests/unit/controllers/vertex/tasks-test.js | 28 + .../webapp/tests/unit/entities/entity-test.js | 131 +- .../tests/unit/initializers/jquery-test.js | 5 +- .../webapp/tests/unit/models/abstract-test.js | 27 + .../webapp/tests/unit/models/ahs-app-test.js | 15 +- .../webapp/tests/unit/models/attempt-test.js | 31 + .../main/webapp/tests/unit/models/dag-test.js | 37 + .../webapp/tests/unit/models/hive-query-test.js | 33 + .../main/webapp/tests/unit/models/task-test.js | 5 + .../main/webapp/tests/unit/models/timed-test.js | 86 + .../webapp/tests/unit/models/timeline-test.js | 14 - .../webapp/tests/unit/models/vertex-test.js | 27 + .../webapp/tests/unit/routes/dag/tasks-test.js | 43 + .../main/webapp/tests/unit/routes/dags-test.js | 175 -- .../main/webapp/tests/unit/routes/home-test.js | 32 + .../webapp/tests/unit/routes/home/index-test.js | 164 ++ .../tests/unit/routes/home/queries-test.js | 120 + .../tests/unit/routes/multi-am-pollster-test.js | 44 + .../main/webapp/tests/unit/routes/query-test.js | 35 + .../tests/unit/routes/query/configs-test.js | 33 + .../tests/unit/routes/query/index-test.js | 35 + .../tests/unit/routes/query/timeline-test.js | 35 + .../tests/unit/routes/server-side-ops-test.js | 176 ++ .../unit/routes/single-am-pollster-test.js | 23 +- .../tests/unit/routes/vertex/tasks-test.js | 41 + .../tests/unit/serializers/attempt-test.js | 9 +- .../webapp/tests/unit/serializers/dag-test.js | 2 + .../tests/unit/serializers/hive-query-test.js | 84 +- .../webapp/tests/unit/serializers/task-test.js | 2 + .../tests/unit/utils/vertex-process-test.js | 31 +- .../tests/unit/utils/virtual-anchor-test.js | 40 + 299 files changed, 9074 insertions(+), 4192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/docs/src/site/site.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 51e954d,3bac7b5..173b458 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@@ -95,7 -94,7 +95,11 @@@ public class AMContainerHelpers */ private static ContainerLaunchContext createCommonContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, ++<<<<<<< HEAD + Credentials credentials, Map<String, LocalResource> localResources, Configuration conf) { ++======= + Credentials credentials) { ++>>>>>>> master // Application environment Map<String, String> environment = new HashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 3afbb23,9d1f42a..f31425e --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@@ -811,165 -726,125 +811,165 @@@ public class Fetcher extends CallableWi } } + private static class MapOutputStat { + final InputAttemptIdentifier srcAttemptId; + final long decompressedLength; + final long compressedLength; + final int forReduce; + + MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) { + this.srcAttemptId = srcAttemptId; + this.decompressedLength = decompressedLength; + this.compressedLength = compressedLength; + this.forReduce = forReduce; + } + + @Override + public String toString() { + return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength + ", compressed length: " + compressedLength + ", reduce: " + forReduce); + } + } private InputAttemptIdentifier[] fetchInputs(DataInputStream input, - CachingCallBack callback) throws FetcherReadTimeoutException { + CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException { FetchedInput fetchedInput = null; InputAttemptIdentifier srcAttemptId = null; - long decompressedLength = -1; - long compressedLength = -1; - + long decompressedLength = 0; + long compressedLength = 0; try { long startTime = System.currentTimeMillis(); - int responsePartition = -1; - // Read the shuffle header - String pathComponent = null; - try { - ShuffleHeader header = new ShuffleHeader(); - header.readFields(input); - pathComponent = header.getMapId(); - - srcAttemptId = pathToAttemptMap.get(pathComponent); - compressedLength = header.getCompressedLength(); - decompressedLength = header.getUncompressedLength(); - responsePartition = header.getPartition(); - } catch (IllegalArgumentException e) { - // badIdErrs.increment(1); - if (!isShutDown.get()) { - LOG.warn("Invalid src id ", e); - // Don't know which one was bad, so consider all of them as bad - return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); - } else { - if (isDebugEnabled) { - LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage()); - } - return null; - } + int partitionCount = 1; + + if (this.compositeFetch) { + // Multiple partitions are fetched + partitionCount = WritableUtils.readVInt(input); } + ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount); + for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++) { + MapOutputStat mapOutputStat = null; + int responsePartition = -1; + // Read the shuffle header + String pathComponent = null; + try { + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + pathComponent = header.getMapId(); + srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition())); + + if (header.getCompressedLength() == 0) { + // Empty partitions are already accounted for + continue; + } - // Do some basic sanity verification - if (!verifySanity(compressedLength, decompressedLength, - responsePartition, srcAttemptId, pathComponent)) { - if (!isShutDown.get()) { - if (srcAttemptId == null) { - LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null"); - srcAttemptId = getNextRemainingAttempt(); + mapOutputStat = new MapOutputStat(srcAttemptId, + header.getUncompressedLength(), + header.getCompressedLength(), + header.getPartition()); + mapOutputStats.add(mapOutputStat); + responsePartition = header.getPartition(); + } catch (IllegalArgumentException e) { + // badIdErrs.increment(1); + if (!isShutDown.get()) { + LOG.warn("Invalid src id ", e); + // Don't know which one was bad, so consider all of them as bad + return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]); + } else { + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage()); + } + return null; } - assert (srcAttemptId != null); - return new InputAttemptIdentifier[]{srcAttemptId}; - } else { - if (isDebugEnabled) { - LOG.debug("Already shutdown. Ignoring verification failure."); + } + + // Do some basic sanity verification + if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, + responsePartition, mapOutputStat.srcAttemptId, pathComponent)) { + if (!isShutDown.get()) { + srcAttemptId = mapOutputStat.srcAttemptId; + if (srcAttemptId == null) { + LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null"); + srcAttemptId = getNextRemainingAttempt(); + } + assert (srcAttemptId != null); + return new InputAttemptIdentifier[]{srcAttemptId}; + } else { + if (isDebugEnabled) { + LOG.debug("Already shutdown. Ignoring verification failure."); + } + return null; } - return null; } - } - if (isDebugEnabled) { - LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength - + ", decomp len: " + decompressedLength); - } - - // TODO TEZ-957. handle IOException here when Broadcast has better error checking - if (srcAttemptId.isShared() && callback != null) { - // force disk if input is being shared - fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength, - compressedLength, srcAttemptId); - } else { - fetchedInput = inputManager.allocate(decompressedLength, - compressedLength, srcAttemptId); - } - // No concept of WAIT at the moment. - // // Check if we can shuffle *now* ... - // if (fetchedInput.getType() == FetchedInput.WAIT) { - // LOG.info("fetcher#" + id + - // " - MergerManager returned Status.WAIT ..."); - // //Not an error but wait to process data. - // return EMPTY_ATTEMPT_ID_ARRAY; - // } - - // Go! - if (isDebugEnabled) { - LOG.debug("fetcher" + " about to shuffle output of srcAttempt " - + fetchedInput.getInputAttemptIdentifier() + " decomp: " - + decompressedLength + " len: " + compressedLength + " to " - + fetchedInput.getType()); + if (isDebugEnabled) { + LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength + + ", decomp len: " + mapOutputStat.decompressedLength); + } } - if (fetchedInput.getType() == Type.MEMORY) { - ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), - input, (int) decompressedLength, (int) compressedLength, codec, - ifileReadAhead, ifileReadAheadLength, LOG, - fetchedInput.getInputAttemptIdentifier()); - } else if (fetchedInput.getType() == Type.DISK) { - ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), - (host +":" +port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier(), - ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); - } else { - throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + - fetchedInput); - } + for (MapOutputStat mapOutputStat : mapOutputStats) { + // Get the location for the map output - either in-memory or on-disk + srcAttemptId = mapOutputStat.srcAttemptId; + decompressedLength = mapOutputStat.decompressedLength; + compressedLength = mapOutputStat.compressedLength; + // TODO TEZ-957. handle IOException here when Broadcast has better error checking + if (srcAttemptId.isShared() && callback != null) { + // force disk if input is being shared + fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength, + compressedLength, srcAttemptId); + } else { + fetchedInput = inputManager.allocate(decompressedLength, + compressedLength, srcAttemptId); + } + // No concept of WAIT at the moment. + // // Check if we can shuffle *now* ... + // if (fetchedInput.getType() == FetchedInput.WAIT) { + // LOG.info("fetcher#" + id + + // " - MergerManager returned Status.WAIT ..."); + // //Not an error but wait to process data. + // return EMPTY_ATTEMPT_ID_ARRAY; + // } + + // Go! + if (isDebugEnabled) { + LOG.debug("fetcher" + " about to shuffle output of srcAttempt " + + fetchedInput.getInputAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + + fetchedInput.getType()); + } - // offer the fetched input for caching - if (srcAttemptId.isShared() && callback != null) { - // this has to be before the fetchSucceeded, because that goes across - // threads into the reader thread and can potentially shutdown this thread - // while it is still caching. - callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength); - } + if (fetchedInput.getType() == Type.MEMORY) { + ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(), + input, (int) decompressedLength, (int) compressedLength, codec, + ifileReadAhead, ifileReadAheadLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString()); ++ fetchedInput.getInputAttemptIdentifier()); + } else if (fetchedInput.getType() == Type.DISK) { + ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(), + (host + ":" + port), input, compressedLength, decompressedLength, LOG, - fetchedInput.getInputAttemptIdentifier().toString(), ++ fetchedInput.getInputAttemptIdentifier(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); + } else { + throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + + fetchedInput); + } - // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); - // Reset retryStartTime as map task make progress if retried before. - retryStartTime = 0; - fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, - compressedLength, decompressedLength, (endTime - startTime)); + // offer the fetched input for caching + if (srcAttemptId.isShared() && callback != null) { + // this has to be before the fetchSucceeded, because that goes across + // threads into the reader thread and can potentially shutdown this thread + // while it is still caching. + callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength); + } - // Note successful shuffle - srcAttemptsRemaining.remove(srcAttemptId.toString()); + // Inform the shuffle scheduler + long endTime = System.currentTimeMillis(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput, + compressedLength, decompressedLength, (endTime - startTime)); - // metrics.successFetch(); - return null; + // Note successful shuffle + // metrics.successFetch(); + } + srcAttemptsRemaining.remove(inputAttemptIdentifier.toString()); } catch (IOException ioe) { if (isShutDown.get()) { cleanupFetchedInput(fetchedInput); http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 1d644aa,caddbc8..710466f --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@@ -40,12 -41,14 +41,13 @@@ import com.google.protobuf.ByteString import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.http.BaseHttpConnection; -import org.apache.tez.http.HttpConnection; import org.apache.tez.http.HttpConnectionParams; -import org.apache.tez.http.SSLFactory; -import org.apache.tez.http.async.netty.AsyncHttpConnection; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; + import org.apache.tez.util.FastNumberFormat; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index a23ce72,b2ff51d..5661a6d --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@@ -639,13 -626,13 +642,13 @@@ public class ShuffleManager implements inputContext.notifyProgress(); boolean committed = false; - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { synchronized (completedInputSet) { - if (!completedInputSet.contains(inputIdentifier)) { + if (!completedInputSet.get(inputIdentifier)) { fetchedInput.commit(); committed = true; - ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, - fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); + fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes, + decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); // Processing counters for completed and commit fetches only. Need // additional counters for excessive fetches - which primarily comes http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 1bfd2a6,58ca1e2..4b7c7fd --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@@ -533,47 -464,69 +533,47 @@@ class FetcherOrderedGrouped extends Cal } return EMPTY_ATTEMPT_ID_ARRAY; } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); - } - // Get the location for the map output - either in-memory or on-disk - try { - mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength, id); - } catch (IOException e) { - if (!stopped) { - // Kill the reduce attempt - ioErrs.increment(1); - scheduler.reportLocalError(e); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Already stopped. Ignoring error from merger.reserve"); - } + // Check if we can shuffle *now* ... + if (mapOutput.getType() == Type.WAIT) { + LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Check if we can shuffle *now* ... - if (mapOutput.getType() == Type.WAIT) { - LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - //Not an error but wait to process data. - return EMPTY_ATTEMPT_ID_ARRAY; - } - - // Go! - if (LOG.isDebugEnabled()) { - LOG.debug("fetcher#" + id + " about to shuffle output of map " + - mapOutput.getAttemptIdentifier() + " decomp: " + - decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); - } - if (mapOutput.getType() == Type.MEMORY) { - ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, - (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, - ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier()); - } else if (mapOutput.getType() == Type.DISK) { - ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), - input, compressedLength, decompressedLength, LOG, - mapOutput.getAttemptIdentifier(), - ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); - } else { - throw new IOException("Unknown mapOutput type while fetching shuffle data:" + - mapOutput.getType()); - } + // Go! + if (LOG.isDebugEnabled()) { + LOG.debug("fetcher#" + id + " about to shuffle output of map " + + mapOutput.getAttemptIdentifier() + " decomp: " + + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType()); + } - // Inform the shuffle scheduler - long endTime = System.currentTimeMillis(); - // Reset retryStartTime as map task make progress if retried before. - retryStartTime = 0; + if (mapOutput.getType() == Type.MEMORY) { + ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, + (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead, - ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString()); ++ ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier()); + } else if (mapOutput.getType() == Type.DISK) { + ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), + input, compressedLength, decompressedLength, LOG, - mapOutput.getAttemptIdentifier().toString(), ++ mapOutput.getAttemptIdentifier(), + ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum); + } else { + throw new IOException("Unknown mapOutput type while fetching shuffle data:" + + mapOutput.getType()); + } - scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, - endTime - startTime, mapOutput, false); - // Note successful shuffle - remaining.remove(srcAttemptId.toString()); - metrics.successFetch(); - return null; - } catch (IOException ioe) { + // Inform the shuffle scheduler + long endTime = System.currentTimeMillis(); + // Reset retryStartTime as map task make progress if retried before. + retryStartTime = 0; + + scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, + endTime - startTime, mapOutput, false); + // Note successful shuffle + metrics.successFetch(); + } + remaining.remove(inputAttemptIdentifier.toString()); + } catch(IOException ioe) { if (stopped) { if (LOG.isDebugEnabled()) { LOG.debug("Not reporting fetch failure for exception during data copy: [" http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ----------------------------------------------------------------------
