Upmerge TEZ-3334 branch with master
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/25643aab Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/25643aab Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/25643aab Branch: refs/heads/TEZ-3334 Commit: 25643aab1ffa4e6178b938a5fb2dea33d1c3c1c3 Parents: d77f9b7 a33d221 Author: Jonathan Eagles <[email protected]> Authored: Tue Dec 6 11:06:05 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Tue Dec 6 11:06:05 2016 -0600 ---------------------------------------------------------------------- .travis.yml | 31 + BUILDING.txt | 8 +- CHANGES.txt | 162 +- build-tools/install-protobuf.sh | 22 + docs/pom.xml | 9 + pom.xml | 29 +- tez-api/findbugs-exclude.xml | 10 + .../java/org/apache/tez/client/TezClient.java | 186 +-- .../org/apache/tez/client/TezClientUtils.java | 69 +- .../org/apache/tez/client/TezYarnClient.java | 12 +- .../org/apache/tez/common/ProgressHelper.java | 89 ++ .../org/apache/tez/common/TezCommonUtils.java | 110 +- .../java/org/apache/tez/common/TezUtils.java | 3 - .../org/apache/tez/common/TezYARNUtils.java | 36 +- .../apache/tez/common/security/ACLManager.java | 27 +- .../tez/common/security/DAGAccessControls.java | 43 +- .../security/HistoryACLPolicyManager.java | 30 +- .../main/java/org/apache/tez/dag/api/DAG.java | 68 +- .../apache/tez/dag/api/DagTypeConverters.java | 30 +- .../tez/dag/api/EdgeManagerPluginOnDemand.java | 30 +- .../org/apache/tez/dag/api/HistoryLogLevel.java | 63 + .../apache/tez/dag/api/TezConfiguration.java | 89 +- .../org/apache/tez/dag/api/TezConstants.java | 12 + .../tez/runtime/api/AbstractLogicalInput.java | 5 + .../tez/runtime/api/MergedLogicalInput.java | 5 + .../runtime/api/ProgressFailedException.java | 46 + .../api/events/CompositeDataMovementEvent.java | 6 + .../CompositeRoutedDataMovementEvent.java | 126 ++ tez-api/src/main/proto/DAGApiRecords.proto | 8 + tez-api/src/main/proto/Events.proto | 8 + .../org/apache/tez/client/TestTezClient.java | 67 +- .../apache/tez/client/TestTezClientUtils.java | 18 +- .../apache/tez/common/TestTezCommonUtils.java | 95 ++ .../org/apache/tez/common/TestTezYARNUtils.java | 13 + .../tez/common/security/TestACLManager.java | 24 +- .../common/security/TestDAGAccessControls.java | 167 ++- .../java/org/apache/tez/dag/api/TestDAG.java | 76 + .../org/apache/tez/dag/api/TestDAGPlan.java | 25 +- .../org/apache/tez/dag/api/TestDAGVerify.java | 24 +- .../tez/dag/api/TestDagTypeConverters.java | 40 +- .../apache/tez/dag/api/TestHistoryLogLevel.java | 63 + .../org/apache/tez/common/AsyncDispatcher.java | 4 +- .../org/apache/tez/common/TezUtilsInternal.java | 6 +- .../common/io/NonSyncByteArrayInputStream.java | 99 ++ .../common/io/NonSyncByteArrayOutputStream.java | 113 ++ .../tez/common/io/NonSyncDataOutputStream.java | 57 + .../org/apache/tez/dag/records/TezDAGID.java | 5 +- .../org/apache/tez/common/DrainDispatcher.java | 123 ++ .../org/apache/tez/common/TestTezUtils.java | 6 +- .../org/apache/tez/dag/records/TestTezIds.java | 24 + tez-dag/findbugs-exclude.xml | 11 +- tez-dag/pom.xml | 4 +- .../java/org/apache/tez/client/LocalClient.java | 2 +- .../tez/dag/api/client/DAGClientHandler.java | 11 + .../tez/dag/api/client/DAGClientServer.java | 3 + ...DAGClientAMProtocolBlockingPBServerImpl.java | 7 + .../org/apache/tez/dag/app/DAGAppMaster.java | 122 +- .../dag/app/dag/impl/BroadcastEdgeManager.java | 4 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 65 +- .../dag/impl/OneToOneEdgeManagerOnDemand.java | 7 +- .../app/dag/impl/ScatterGatherEdgeManager.java | 6 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 3 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 85 +- .../tez/dag/app/rm/TezAMRMClientAsync.java | 17 +- .../dag/app/rm/YarnTaskSchedulerService.java | 71 +- .../apache/tez/dag/app/web/WebUIService.java | 14 +- .../tez/dag/history/HistoryEventHandler.java | 45 +- .../tez/dag/history/HistoryEventType.java | 59 +- .../events/VertexConfigurationDoneEvent.java | 1 + .../dag/history/recovery/RecoveryService.java | 3 +- .../apache/tez/dag/history/utils/DAGUtils.java | 19 +- .../dag/api/client/TestDAGClientHandler.java | 2 + .../apache/tez/dag/app/TestDAGAppMaster.java | 81 +- .../tez/dag/app/TestMockDAGAppMaster.java | 9 +- .../apache/tez/dag/app/dag/impl/TestCommit.java | 11 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 4 +- .../tez/dag/app/dag/impl/TestDAGRecovery.java | 2 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 119 +- .../tez/dag/app/rm/TestContainerReuse.java | 409 ++++- .../tez/dag/app/rm/TestTaskScheduler.java | 733 ++------- .../dag/app/rm/TestTaskSchedulerHelpers.java | 28 +- .../tez/dag/app/rm/node/TestAMNodeTracker.java | 2 +- .../dag/history/TestHistoryEventHandler.java | 204 +++ .../TestHistoryEventsProtoConversion.java | 7 +- .../history/recovery/TestRecoveryService.java | 390 ++++- .../org/apache/tez/test/EdgeManagerForTest.java | 2 +- .../apache/tez/examples/CartesianProduct.java | 236 +++ .../org/apache/tez/examples/JoinDataGen.java | 12 +- .../org/apache/tez/examples/JoinValidate.java | 6 +- .../apache/tez/mapreduce/client/YARNRunner.java | 10 +- .../tez/mapreduce/combine/MRCombiner.java | 24 +- .../common/MRInputAMSplitGenerator.java | 13 +- .../tez/mapreduce/hadoop/DeprecatedKeys.java | 4 + .../tez/mapreduce/hadoop/MRInputHelpers.java | 115 +- .../tez/mapreduce/hadoop/MRJobConfig.java | 6 + .../org/apache/tez/mapreduce/input/MRInput.java | 53 +- .../tez/mapreduce/input/MRInputLegacy.java | 5 + .../apache/tez/mapreduce/output/MROutput.java | 138 +- .../tez/mapreduce/output/MultiMROutput.java | 203 +++ .../mapreduce/processor/map/MapProcessor.java | 57 +- .../processor/reduce/ReduceProcessor.java | 35 +- .../src/main/proto/MRRuntimeProtos.proto | 1 + .../org/apache/tez/mapreduce/TezTestUtils.java | 81 + .../tez/mapreduce/combine/TestMRCombiner.java | 73 + .../common/TestMRInputAMSplitGenerator.java | 241 +++ .../common/TestMRInputSplitDistributor.java | 84 +- .../mapreduce/hadoop/TestDeprecatedKeys.java | 6 + .../tez/mapreduce/output/TestMultiMROutput.java | 193 +++ .../tez/mapreduce/processor/MapUtils.java | 9 +- .../processor/map/TestMapProcessor.java | 78 +- .../processor/reduce/TestReduceProcessor.java | 2 +- .../tez/history/parser/ATSFileParser.java | 10 +- .../datamodel/AdditionalInputOutputDetails.java | 2 +- .../apache/tez/history/parser/utils/Utils.java | 18 +- .../ats/acls/ATSHistoryACLPolicyManager.java | 16 +- .../ats/acls/TestATSHistoryWithACLs.java | 202 +-- .../ats/acls/ATSV15HistoryACLPolicyManager.java | 17 +- .../ats/ATSV15HistoryLoggingService.java | 154 +- .../dag/history/ats/acls/TestATSHistoryV15.java | 100 +- .../ats/TestATSV15HistoryLoggingService.java | 257 +++- .../logging/ats/ATSHistoryLoggingService.java | 171 ++- .../ats/TestATSHistoryLoggingService.java | 276 +++- .../ats/TestHistoryEventTimelineConversion.java | 34 + .../org/apache/tez/common/ProtoConverters.java | 22 + .../apache/tez/runtime/api/impl/EventType.java | 1 + .../apache/tez/runtime/api/impl/TezEvent.java | 14 + .../api/impl/TezProcessorContextImpl.java | 6 +- .../org/apache/tez/runtime/task/TezChild.java | 12 +- tez-runtime-library/findbugs-exclude.xml | 70 + tez-runtime-library/pom.xml | 2 + .../DestinationTaskInputsProperty.java | 92 ++ .../vertexmanager/FairEdgeConfiguration.java | 111 ++ .../vertexmanager/FairShuffleEdgeManager.java | 153 ++ .../vertexmanager/FairShuffleVertexManager.java | 631 ++++++++ .../vertexmanager/ShuffleVertexManager.java | 896 +++-------- .../vertexmanager/ShuffleVertexManagerBase.java | 823 ++++++++++ .../library/api/KeyValueWriterWithBasePath.java | 49 + .../CartesianProductCombination.java | 164 ++ .../CartesianProductConfig.java | 255 ++++ .../CartesianProductEdgeManager.java | 105 ++ .../CartesianProductEdgeManagerConfig.java | 64 + .../CartesianProductEdgeManagerPartitioned.java | 125 ++ .../CartesianProductEdgeManagerReal.java | 64 + ...artesianProductEdgeManagerUnpartitioned.java | 98 ++ .../CartesianProductFilter.java | 47 + .../CartesianProductFilterDescriptor.java | 28 + .../CartesianProductVertexManager.java | 158 ++ .../CartesianProductVertexManagerConfig.java | 75 + ...artesianProductVertexManagerPartitioned.java | 192 +++ .../CartesianProductVertexManagerReal.java | 50 + ...tesianProductVertexManagerUnpartitioned.java | 213 +++ .../tez/runtime/library/common/ConfigUtils.java | 10 + .../runtime/library/common/ValuesIterator.java | 4 +- .../common/readers/UnorderedKVReader.java | 15 + .../TezBytesWritableSerialization.java | 4 +- .../common/shuffle/MemoryFetchedInput.java | 4 +- .../library/common/shuffle/ShuffleUtils.java | 27 +- .../impl/ShuffleInputEventHandlerImpl.java | 58 +- .../common/shuffle/impl/ShuffleManager.java | 19 +- .../shuffle/orderedgrouped/InMemoryReader.java | 4 +- .../shuffle/orderedgrouped/InMemoryWriter.java | 4 +- .../shuffle/orderedgrouped/MapOutput.java | 10 +- .../common/shuffle/orderedgrouped/Shuffle.java | 10 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 58 +- .../orderedgrouped/ShuffleScheduler.java | 84 +- .../runtime/library/common/sort/impl/IFile.java | 9 + .../common/sort/impl/PipelinedSorter.java | 14 +- .../common/sort/impl/dflt/DefaultSorter.java | 10 +- .../writers/UnorderedPartitionedKVWriter.java | 17 +- .../runtime/library/conf/BaseConfigBuilder.java | 2 + .../conf/OrderedGroupedKVInputConfig.java | 16 + .../conf/OrderedPartitionedKVEdgeConfig.java | 7 + .../conf/OrderedPartitionedKVOutputConfig.java | 16 + .../library/conf/UnorderedKVEdgeConfig.java | 7 + .../library/conf/UnorderedKVInputConfig.java | 15 + .../library/conf/UnorderedKVOutputConfig.java | 16 + .../conf/UnorderedPartitionedKVEdgeConfig.java | 8 + .../UnorderedPartitionedKVOutputConfig.java | 16 + .../input/ConcatenatedMergedKeyValueInput.java | 19 +- .../input/ConcatenatedMergedKeyValuesInput.java | 20 +- .../library/input/OrderedGroupedKVInput.java | 18 + .../input/OrderedGroupedMergedKVInput.java | 9 +- .../runtime/library/input/UnorderedKVInput.java | 9 + .../output/OrderedPartitionedKVOutput.java | 8 +- .../library/output/UnorderedKVOutput.java | 3 +- .../output/UnorderedPartitionedKVOutput.java | 3 +- .../library/processor/SimpleProcessor.java | 14 +- .../library/processor/SleepProcessor.java | 20 +- .../main/proto/CartesianProductPayload.proto | 31 + .../src/main/proto/FairShufflePayloads.proto | 37 + .../TestFairShuffleVertexManager.java | 350 +++++ .../vertexmanager/TestShuffleVertexManager.java | 1411 ++---------------- .../TestShuffleVertexManagerBase.java | 1115 ++++++++++++++ .../TestShuffleVertexManagerUtils.java | 346 +++++ .../TestCartesianProductCombination.java | 110 ++ .../TestCartesianProductConfig.java | 106 ++ .../TestCartesianProductEdgeManager.java | 68 + ...tCartesianProductEdgeManagerPartitioned.java | 285 ++++ ...artesianProductEdgeManagerUnpartitioned.java | 241 +++ .../TestCartesianProductVertexManager.java | 162 ++ ...artesianProductVertexManagerPartitioned.java | 212 +++ ...tesianProductVertexManagerUnpartitioned.java | 222 +++ .../common/shuffle/TestShuffleUtils.java | 6 +- .../orderedgrouped/TestShuffleScheduler.java | 63 + .../library/common/sort/impl/TestIFile.java | 54 + .../TestOrderedGroupedMergedKVInputConfig.java | 7 +- .../TestOrderedPartitionedKVEdgeConfig.java | 9 +- .../TestOrderedPartitionedKVOutputConfig.java | 7 +- .../library/conf/TestUnorderedKVEdgeConfig.java | 7 +- .../conf/TestUnorderedKVInputConfig.java | 7 +- .../conf/TestUnorderedKVOutputConfig.java | 7 +- .../TestUnorderedPartitionedKVEdgeConfig.java | 7 +- .../TestUnorderedPartitionedKVOutputConfig.java | 7 +- .../TestIFile_concatenated_compressed.bin | Bin 0 -> 51913 bytes .../tez/mapreduce/examples/MRRSleepJob.java | 30 +- .../examples/MultipleCommitsExample.java | 12 +- .../processor/FilterByWordInputProcessor.java | 30 +- .../processor/FilterByWordOutputProcessor.java | 2 + .../tez/test/TestExceptionPropagation.java | 2 +- .../org/apache/tez/test/TestFaultTolerance.java | 74 +- .../java/org/apache/tez/test/TestTezJobs.java | 164 ++ .../tez/analyzer/plugins/AnalyzerDriver.java | 2 + .../analyzer/plugins/CriticalPathAnalyzer.java | 30 +- .../plugins/TaskAssignmentAnalyzer.java | 103 ++ .../tez/analyzer/plugins/TezAnalyzerBase.java | 21 +- .../org/apache/tez/analyzer/utils/SVGUtils.java | 5 +- tez-ui/pom.xml | 9 +- tez-ui/src/main/webapp/.travis.yml | 23 - tez-ui/src/main/webapp/app/adapters/timeline.js | 1 + .../webapp/app/components/dags-pagination-ui.js | 4 +- tez-ui/src/main/webapp/app/controllers/app.js | 4 +- tez-ui/src/main/webapp/app/routes/app.js | 15 +- .../src/main/webapp/app/routes/app/configs.js | 5 +- tez-ui/src/main/webapp/app/routes/app/dags.js | 2 +- tez-ui/src/main/webapp/app/routes/app/index.js | 27 +- tez-ui/src/main/webapp/app/routes/dags.js | 2 + .../src/main/webapp/app/templates/app/index.hbs | 47 +- .../templates/components/dags-pagination-ui.hbs | 10 +- tez-ui/src/main/webapp/package.json | 12 +- .../components/dags-pagination-ui-test.js | 30 +- .../webapp/tests/unit/adapters/timeline-test.js | 10 +- .../webapp/tests/unit/controllers/app-test.js | 18 + .../main/webapp/tests/unit/routes/app-test.js | 74 + .../tests/unit/routes/app/configs-test.js | 51 + .../webapp/tests/unit/routes/app/dags-test.js | 32 + .../webapp/tests/unit/routes/app/index-test.js | 47 + .../main/webapp/tests/unit/routes/dags-test.js | 23 +- 248 files changed, 14717 insertions(+), 4115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index eb2dc7d,e4d2dd3..76b83f5 --- a/pom.xml +++ b/pom.xml @@@ -56,10 -55,9 +56,10 @@@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <scm.url>scm:git:https://git-wip-us.apache.org/repos/asf/tez.git</scm.url> <build.time>${maven.build.timestamp}</build.time> - <frontend-maven-plugin.version>0.0.23</frontend-maven-plugin.version> + <frontend-maven-plugin.version>1.1</frontend-maven-plugin.version> <findbugs-maven-plugin.version>3.0.1</findbugs-maven-plugin.version> <javadoc-maven-plugin.version>2.9.1</javadoc-maven-plugin.version> + <shade-maven-plugin.version>2.4.3</shade-maven-plugin.version> </properties> <scm> <connection>${scm.url}</connection> http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 64a10d2,aa07233..6fa43e8 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@@ -260,13 -279,13 +261,14 @@@ public class ShuffleUtils * @param finalMergeEnabled * @param isLastEvent * @param pathComponent + * @param conf + * @param deflater * @return ByteBuffer * @throws IOException */ static ByteBuffer generateDMEPayload(boolean sendEmptyPartitionDetails, - int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, - int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf) + int numPhysicalOutputs, TezSpillRecord spillRecord, OutputContext context, - int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Deflater deflater) ++ int spillId, boolean finalMergeEnabled, boolean isLastEvent, String pathComponent, Configuration conf, Deflater deflater) throws IOException { DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto .newBuilder(); @@@ -388,9 -404,9 +391,9 @@@ * @throws IOException */ public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, - boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, - int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf) + boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, + int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Deflater deflater) ++ @Nullable long[] partitionStats, boolean reportDetailedPartitionStats, Configuration conf, Deflater deflater) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); @@@ -408,7 -424,7 +411,7 @@@ ByteBuffer payload = generateDMEPayload(sendEmptyPartitionDetails, numPhysicalOutputs, spillRecord, context, spillId, - finalMergeEnabled, isLastEvent, pathComponent, conf); - finalMergeEnabled, isLastEvent, pathComponent, deflater); ++ finalMergeEnabled, isLastEvent, pathComponent, conf, deflater); if (finalMergeEnabled || isLastEvent) { VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats, http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index e468a55,9b3aadb..5203851 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@@ -350,7 -354,7 +354,7 @@@ public class PipelinedSorter extends Ex ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats(), this.conf); - reportDetailedPartitionStats(), deflater); ++ reportDetailedPartitionStats(), this.conf, deflater); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@@ -673,7 -677,7 +677,7 @@@ ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, - reportDetailedPartitionStats(), this.conf); - reportDetailedPartitionStats(), deflater); ++ reportDetailedPartitionStats(), this.conf, deflater); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 21c40e9,b5c3071..8ff1c99 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@@ -1133,7 -1137,7 +1137,7 @@@ public final class DefaultSorter extend String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, - partitionStats, reportDetailedPartitionStats(), this.conf); - partitionStats, reportDetailedPartitionStats(), deflater); ++ partitionStats, reportDetailedPartitionStats(), this.conf, deflater); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 760daf5,0f38a29..e9d9652 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@@ -52,7 -52,7 +52,8 @@@ import org.apache.tez.common.TezCommonU import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.dag.api.TezConfiguration; + import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 6ebcac8,5f6a304..6b14f8d --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@@ -200,7 -204,7 +204,7 @@@ public class OrderedPartitionedKVOutpu ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), - sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf); - sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), deflater); ++ sorter.getPartitionStats(), sorter.reportDetailedPartitionStats(), this.conf, deflater); } return eventList; } http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/25643aab/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 03ddfa5,496468b..ec19f67 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@@ -165,7 -163,7 +165,7 @@@ public class TestShuffleUtils String pathComponent = "/attempt_x_y_0/file.out"; ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf); - physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); ++ physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@@ -204,7 -202,7 +204,7 @@@ //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf); - physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); ++ physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@@ -245,7 -243,7 +245,7 @@@ //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, spillId, new TezSpillRecord(indexFile, conf), - physicalOutputs, true, pathComponent, null, false, this.conf); - physicalOutputs, true, pathComponent, null, false, TezCommonUtils.newBestCompressionDeflater()); ++ physicalOutputs, true, pathComponent, null, false, this.conf, TezCommonUtils.newBestCompressionDeflater()); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent);
