Merge branch 'master' into TEZ-3334-MERGE1
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a496252e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a496252e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a496252e Branch: refs/heads/master Commit: a496252e8d527689b7530094780fb28ce25b6b09 Parents: 251ca1c 1176733 Author: Jonathan Eagles <[email protected]> Authored: Fri May 19 14:57:08 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri May 19 14:57:08 2017 -0500 ---------------------------------------------------------------------- docs/pom.xml | 9 + .../apache/tez/dag/api/TezConfiguration.java | 10 + .../tez/serviceplugins/api/TaskScheduler.java | 15 + .../dag/app/TaskCommunicatorContextImpl.java | 13 +- .../tez/dag/app/TaskCommunicatorManager.java | 9 +- .../tez/dag/app/TezTaskCommunicatorImpl.java | 4 +- .../dag/app/dag/TaskAttemptStateInternal.java | 1 + .../java/org/apache/tez/dag/app/dag/Vertex.java | 7 + .../event/TaskAttemptEventStartedRemotely.java | 33 +- .../dag/event/TaskAttemptEventSubmitted.java | 49 ++ .../dag/app/dag/event/TaskAttemptEventType.java | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 101 +++- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 20 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 75 ++- .../app/rm/AMSchedulerEventTAStateUpdated.java | 42 ++ .../tez/dag/app/rm/AMSchedulerEventType.java | 1 + .../tez/dag/app/rm/TaskSchedulerManager.java | 20 + .../tez/dag/app/rm/TaskSchedulerWrapper.java | 4 + .../api/TaskCommunicatorContext.java | 22 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 230 ++++++-- .../tez/dag/app/dag/impl/TestTaskImpl.java | 9 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 19 +- .../TezTestServiceTaskCommunicatorImpl.java | 5 +- .../tez-yarn-timeline-history-with-fs/pom.xml | 5 + .../library/api/TezRuntimeConfiguration.java | 11 + .../CartesianProductCombination.java | 4 +- .../CartesianProductConfig.java | 45 +- .../CartesianProductEdgeManager.java | 11 +- .../CartesianProductEdgeManagerConfig.java | 67 --- .../CartesianProductEdgeManagerPartitioned.java | 31 +- .../CartesianProductEdgeManagerReal.java | 3 +- ...artesianProductEdgeManagerUnpartitioned.java | 125 ----- .../CartesianProductVertexManager.java | 64 ++- .../CartesianProductVertexManagerConfig.java | 77 --- ...artesianProductVertexManagerPartitioned.java | 38 +- .../CartesianProductVertexManagerReal.java | 3 +- ...tesianProductVertexManagerUnpartitioned.java | 438 --------------- .../FairCartesianProductEdgeManager.java | 174 ++++++ .../FairCartesianProductVertexManager.java | 551 +++++++++++++++++++ .../library/common/shuffle/ShuffleUtils.java | 2 + .../common/shuffle/impl/ShuffleManager.java | 34 +- .../shuffle/orderedgrouped/MergeManager.java | 5 +- .../orderedgrouped/ShuffleScheduler.java | 2 +- .../common/sort/impl/TezIndexRecord.java | 2 +- .../library/common/sort/impl/TezMerger.java | 77 ++- .../writers/UnorderedPartitionedKVWriter.java | 196 +++++-- .../partitioner/RoundRobinPartitioner.java | 30 + .../tez/runtime/library/utils/Grouper.java | 66 +-- .../main/proto/CartesianProductPayload.proto | 11 +- .../src/main/proto/ShufflePayloads.proto | 1 + .../TestCartesianProductCombination.java | 2 +- .../TestCartesianProductConfig.java | 37 +- .../TestCartesianProductEdgeManager.java | 8 +- .../TestCartesianProductEdgeManagerConfig.java | 53 -- ...tCartesianProductEdgeManagerPartitioned.java | 77 +-- ...artesianProductEdgeManagerUnpartitioned.java | 288 ---------- .../TestCartesianProductVertexManager.java | 2 +- ...TestCartesianProductVertexManagerConfig.java | 53 -- ...artesianProductVertexManagerPartitioned.java | 26 +- ...tesianProductVertexManagerUnpartitioned.java | 460 ---------------- .../TestFairCartesianProductEdgeManager.java | 245 +++++++++ .../TestFairCartesianProductVertexManager.java | 500 +++++++++++++++++ .../library/cartesianproduct/TestGrouper.java | 36 +- .../common/shuffle/TestShuffleUtils.java | 5 +- .../impl/TestShuffleInputEventHandlerImpl.java | 14 +- .../TestUnorderedPartitionedKVWriter.java | 117 +++- .../mapreduce/examples/CartesianProduct.java | 385 +++++++++++++ .../tez/mapreduce/examples/ExampleDriver.java | 2 + .../org/apache/tez/test/TestFaultTolerance.java | 5 +- .../java/org/apache/tez/test/TestOutput.java | 11 + .../java/org/apache/tez/test/TestTezJobs.java | 13 + tez-ui/src/main/webapp/app/models/vertex.js | 13 +- 72 files changed, 3124 insertions(+), 2000 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index bc3ca0e,8716b92..81fac38 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@@ -475,10 -475,15 +485,16 @@@ public class ShuffleManager implements if (inputHost.getNumPendingPartitions() > 0) { pendingHosts.add(inputHost); //add it to queue } + for(InputAttemptIdentifier input : pendingInputsOfOnePartition.getInputs()) { + ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier()); + if (eventInfo != null) { + eventInfo.scheduledForDownload = true; + } + } fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), - pendingInputsOfOnePartition.getPartition(), - pendingInputsOfOnePartition.getInputs()); + pendingInputsOfOnePartitionRange.getPartition(), + pendingInputsOfOnePartitionRange.getPartitionCount(), + pendingInputsOfOnePartitionRange.getInputs()); if (LOG.isDebugEnabled()) { LOG.debug("Created Fetcher for host: " + inputHost.getHost() + ", info: " + inputHost.getAdditionalInfo() http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java index e6accda,af52f90..c934f6c --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java @@@ -224,10 -220,13 +224,13 @@@ public class TestShuffleInputEventHandl dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + CompositeInputAttemptIdentifier expectedId2 = new CompositeInputAttemptIdentifier(1, 0, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1); verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2), eq(0)); + // Let attemptNum 0 be scheduled. + shuffleManager.shuffleInfoEventsMap.get(expectedId2.getInputIdentifier()).scheduledForDownload = true; + //0--> 1 with spill id 1 (attemptNum 1). This should report exception dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); @@@ -253,10 -252,13 +256,13 @@@ Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1); handler.handleEvents(Collections.singletonList(dme)); - InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1, - PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1); + CompositeInputAttemptIdentifier expected = new CompositeInputAttemptIdentifier(1, 1, + PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1); verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0)); + // Let attemptNum 1 be scheduled. + shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload = true; + //Now send attemptNum 0. This should throw exception, because attempt #1 is already added dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0); handler.handleEvents(Collections.singletonList(dme)); http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 07feb20,6ea1562..27e7992 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@@ -162,11 -161,10 +163,12 @@@ public class TestUnorderedPartitionedKV ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); - int maxSingleBufferSizeBytes = 2047; + final int maxSingleBufferSizeBytes = 2047; + final long sizePerBuffer = maxSingleBufferSizeBytes - 64 - maxSingleBufferSizeBytes % 4; Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, false, maxSingleBufferSizeBytes); @@@ -718,13 -774,14 +784,17 @@@ ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); - OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId); + int dagId = 1; + String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, - shouldCompress, -1); + shouldCompress, maxSingleBufferSizeBytes); + conf.setInt( + TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT, + bufferMergePercent); + CompressionCodec codec = null; if (shouldCompress) { codec = new DefaultCodec();
