Repository: tez Updated Branches: refs/heads/master cc33410d8 -> 80ba12b2a
TEZ-3216. Add support for more precise partition stats in VertexManagerEvent. Contributed by Ming Ma. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/80ba12b2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/80ba12b2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/80ba12b2 Branch: refs/heads/master Commit: 80ba12b2ad03ccb860aafc53a46c447aaa242d0d Parents: cc33410 Author: Siddharth Seth <[email protected]> Authored: Thu Jun 16 15:54:01 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Thu Jun 16 15:54:01 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../library/api/TezRuntimeConfiguration.java | 79 +++++++++++++++++- .../library/common/shuffle/ShuffleUtils.java | 85 +++++++++++++++----- .../common/sort/impl/ExternalSorter.java | 19 +++-- .../common/sort/impl/PipelinedSorter.java | 10 ++- .../common/sort/impl/dflt/DefaultSorter.java | 2 +- .../writers/UnorderedPartitionedKVWriter.java | 72 ++++++++--------- .../output/OrderedPartitionedKVOutput.java | 2 +- .../library/output/UnorderedKVOutput.java | 1 + .../output/UnorderedPartitionedKVOutput.java | 1 + .../src/main/proto/ShufflePayloads.proto | 7 ++ .../common/shuffle/TestShuffleUtils.java | 15 ++-- .../common/sort/impl/TestPipelinedSorter.java | 10 ++- .../TestUnorderedPartitionedKVWriter.java | 81 +++++++++++++------ .../TestOrderedPartitionedKVEdgeConfig.java | 10 ++- .../library/output/TestOnFileSortedOutput.java | 54 +++++++++---- 16 files changed, 321 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1e1803d..5f90539 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3216. Add support for more precise partition stats in VertexManagerEvent. TEZ-3296. Tez job can hang if two vertices at the same root distance have different task requirements TEZ-3294. DAG.createDag() does not clear local state on repeat calls. TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce. http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 08f76f2..4d24bfb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -175,11 +175,16 @@ public class TezRuntimeConfiguration { /** * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496 * This can be enabled/disabled at vertex level. + * {@link org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats} + * defines the list of values that can be specified. + * TODO TEZ-3303 Given ShuffleVertexManager doesn't consume precise stats + * yet. So do not set the value to "precise" yet when ShuffleVertexManager is used. */ - @ConfigurationProperty(type = "boolean") - public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX + - "report.partition.stats"; - public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true; + @ConfigurationProperty + public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = + TEZ_RUNTIME_PREFIX + "report.partition.stats"; + public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = + ReportPartitionStats.MEMORY_OPTIMIZED.getType(); /** * Size of the buffer to use if not writing directly to disk. @@ -635,4 +640,70 @@ public class TezRuntimeConfiguration { public static Map<String, String> getOtherConfigDefaults() { return Collections.unmodifiableMap(otherConfMap); } + + public enum ReportPartitionStats { + @Deprecated + /** + * Don't report partition stats. It is the same as NONE. + * It is defined to maintain backward compatibility given + * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used + * to be boolean type. + */ + DISABLED("false"), + + @Deprecated + /** + * Report partition stats. It is the same as MEMORY_OPTIMIZED. + * It is defined to maintain backward compatibility given + * Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used + * to be boolean type. + */ + ENABLED("true"), + + /** + * Don't report partition stats. + */ + NONE("none"), + + /** + * Report partition stats with less precision to reduce + * memory and CPU overhead + */ + MEMORY_OPTIMIZED("memory_optimized"), + + /** + * Report precise partition stats in MB. + */ + PRECISE("precise"); + + private final String type; + + private ReportPartitionStats(String type) { + this.type = type; + } + + public final String getType() { + return type; + } + + public boolean isEnabled() { + return !equals(ReportPartitionStats.DISABLED) && + !equals(ReportPartitionStats.NONE); + } + + public boolean isPrecise() { + return equals(ReportPartitionStats.PRECISE); + } + + public static ReportPartitionStats fromString(String type) { + if (type != null) { + for (ReportPartitionStats b : ReportPartitionStats.values()) { + if (type.equalsIgnoreCase(b.type)) { + return b; + } + } + } + throw new IllegalArgumentException("Invalid type " + type); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index ae646ea..d74e447 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import javax.crypto.SecretKey; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; @@ -67,11 +68,13 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedPartitionStatsProto; public class ShuffleUtils { private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class); public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle"; + private static final long MB = 1024l * 1024l; //Shared by multiple threads private static volatile SSLFactory sslFactory; @@ -400,7 +403,8 @@ public class ShuffleUtils { public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, - @Nullable long[] partitionStats) throws IOException { + @Nullable long[] partitionStats, boolean reportDetailedPartitionStats) + throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); context.notifyProgress(); @@ -420,34 +424,50 @@ public class ShuffleUtils { finalMergeEnabled, isLastEvent, pathComponent); if (finalMergeEnabled || isLastEvent) { - ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = - ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); - - long outputSize = context.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + VertexManagerEvent vmEvent = generateVMEvent(context, partitionStats, + reportDetailedPartitionStats); + eventList.add(vmEvent); + } - //Set this information only when required. In pipelined shuffle, multiple events would end - // up adding up to final outputsize. This is needed for auto-reduce parallelism to work - // properly. - vmBuilder.setOutputSize(outputSize); + CompositeDataMovementEvent csdme = + CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload); + eventList.add(csdme); + } - //set partition stats - if (partitionStats != null && partitionStats.length > 0) { - RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats); + public static VertexManagerEvent generateVMEvent(OutputContext context, + long[] sizePerPartition, boolean reportDetailedPartitionStats) + throws IOException { + ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); + + long outputSize = context.getCounters(). + findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + + // Set this information only when required. In pipelined shuffle, + // multiple events would end up adding up to final output size. + // This is needed for auto-reduce parallelism to work properly. + vmBuilder.setOutputSize(outputSize); + + //set partition stats + if (sizePerPartition != null && sizePerPartition.length > 0) { + if (reportDetailedPartitionStats) { + vmBuilder.setDetailedPartitionStats( + getDetailedPartitionStatsForPhysicalOutput(sizePerPartition)); + } else { + RoaringBitmap stats = getPartitionStatsForPhysicalOutput( + sizePerPartition); DataOutputBuffer dout = new DataOutputBuffer(); stats.serialize(dout); - ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); + ByteString partitionStatsBytes = + TezCommonUtils.compressByteArrayToByteString(dout.getData()); vmBuilder.setPartitionStats(partitionStatsBytes); } - - VertexManagerEvent vmEvent = VertexManagerEvent.create( - context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer()); - eventList.add(vmEvent); } - - CompositeDataMovementEvent csdme = - CompositeDataMovementEvent.create(0, numPhysicalOutputs, payload); - eventList.add(csdme); + VertexManagerEvent vmEvent = VertexManagerEvent.create( + context.getDestinationVertexName(), + vmBuilder.build().toByteString().asReadOnlyByteBuffer()); + return vmEvent; } /** @@ -469,6 +489,29 @@ public class ShuffleUtils { return partitionStats; } + static long ceil(long a, long b) { + return (a + (b - 1)) / b; + } + + /** + * Detailed partition stats + * + * @param sizes actual partition sizes + */ + public static DetailedPartitionStatsProto + getDetailedPartitionStatsForPhysicalOutput(long[] sizes) { + DetailedPartitionStatsProto.Builder builder = + DetailedPartitionStatsProto.newBuilder(); + for (int i=0; i<sizes.length; i++) { + // Round the size up. So 1 byte -> the value of sizeInMB == 1 + // Throws IllegalArgumentException if value is greater than + // Integer.MAX_VALUE. That should be ok given Integer.MAX_VALUE * MB + // means PB. + int sizeInMb = Ints.checkedCast(ceil(sizes[i], MB)); + builder.addSizeInMb(sizeInMb); + } + return builder.build(); + } /** * Log individual fetch complete event. http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index 7a2dc68..b6fe457 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -53,6 +53,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.common.ConfigUtils; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.combine.Combiner; @@ -159,16 +160,19 @@ public abstract class ExternalSorter { protected final TezCounter numAdditionalSpills; // Number of files offered via shuffle-handler to consumers. protected final TezCounter numShuffleChunks; + // How partition stats should be reported. + final ReportPartitionStats reportPartitionStats; public ExternalSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException { this.outputContext = outputContext; this.conf = conf; this.partitions = numOutputs; - boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration - .TEZ_RUNTIME_REPORT_PARTITION_STATS, - TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT); - this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null; + reportPartitionStats = ReportPartitionStats.fromString( + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); + partitionStats = reportPartitionStats.isEnabled() ? + (new long[partitions]) : null; cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT, TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT); @@ -202,7 +206,8 @@ public abstract class ExternalSorter { + ", valueSerializerClass=" + valSerializer + ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf) + ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) - + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + + ", serialization=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY) + + ", reportPartitionStats=" + reportPartitionStats); // counters mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); @@ -412,4 +417,8 @@ public abstract class ExternalSorter { .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); statsReporter.reportItemsProcessed(outputRecords); } + + public boolean reportDetailedPartitionStats() { + return reportPartitionStats.isPrecise(); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 5695bde..897d7d7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -347,9 +347,10 @@ public class PipelinedSorter extends ExternalSorter { private void sendPipelinedShuffleEvents() throws IOException{ List<Event> events = Lists.newLinkedList(); String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1)); - ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, - (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, - pathComponent, partitionStats); + ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, + outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), + partitions, sendEmptyPartitionDetails, pathComponent, partitionStats, + reportDetailedPartitionStats()); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@ -671,7 +672,8 @@ public class PipelinedSorter extends ExternalSorter { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, - sendEmptyPartitionDetails, pathComponent, partitionStats); + sendEmptyPartitionDetails, pathComponent, partitionStats, + reportDetailedPartitionStats()); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index a6a60c2..69bfdb8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -1133,7 +1133,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, - partitionStats); + partitionStats, reportDetailedPartitionStats()); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 76075bb..152096c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -56,8 +56,8 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; -import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.IOInterruptedException; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.Constants; import org.apache.tez.runtime.library.common.sort.impl.IFile; @@ -65,9 +65,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto; -import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,6 +148,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit private final Condition spillInProgress = spillLock.newCondition(); private final boolean pipelinedShuffle; + // How partition stats should be reported. + final ReportPartitionStats reportPartitionStats; private final long indexFileSizeEstimate; @@ -208,7 +208,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit .build()); spillExecutor = MoreExecutors.listeningDecorator(executor); numRecordsPerPartition = new int[numPartitions]; - sizePerPartition = new long[numPartitions]; + reportPartitionStats = ReportPartitionStats.fromString( + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); + sizePerPartition = (reportPartitionStats.isEnabled()) ? + new long[numPartitions] : null; outputLargeRecordsCounter = outputContext.getCounters().findCounter( TaskCounter.OUTPUT_LARGE_RECORDS); @@ -233,7 +237,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit + ", sizePerBuffer=" + sizePerBuffer + ", skipBuffers=" + skipBuffers + ", pipelinedShuffle=" + pipelinedShuffle - + ", numPartitions=" + numPartitions); + + ", numPartitions=" + numPartitions + + ", reportPartitionStats=" + reportPartitionStats); } private void computeNumBuffersAndSize(int bufferLimit) { @@ -364,10 +369,16 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } } + private boolean reportPartitionStats() { + return (sizePerPartition != null); + } + private void updateGlobalStats(WrappedBuffer buffer) { for (int i = 0; i < numPartitions; i++) { numRecordsPerPartition[i] += buffer.recordsPerPartition[i]; - sizePerPartition[i] += buffer.sizePerPartition[i]; + if (reportPartitionStats()) { + sizePerPartition[i] += buffer.sizePerPartition[i]; + } } } @@ -529,7 +540,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (outputRecordsCounter.getValue() == 0) { emptyPartitions.set(0); } - sizePerPartition[0] = rawLen; + if (reportPartitionStats()) { + sizePerPartition[0] = rawLen; + } cleanupCurrentBuffer(); outputBytesWithOverheadCounter.increment(rawLen); @@ -575,37 +588,13 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit return emptyPartitions; } - private Event generateVMEvent() throws IOException { - return generateVMEvent(this.sizePerPartition); + public boolean reportDetailedPartitionStats() { + return reportPartitionStats.isPrecise(); } - private Event generateVMEvent(long[] sizePerPartition) throws IOException { - ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = - ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder(); - - long outputSize = outputContext.getCounters(). - findCounter(TaskCounter.OUTPUT_BYTES).getValue(); - - // Set this information only when required. In pipelined shuffle, - // multiple events would end up adding up to final output size. - // This is needed for auto-reduce parallelism to work properly. - vmBuilder.setOutputSize(outputSize); - - //set partition stats - if (sizePerPartition != null && sizePerPartition.length > 0) { - RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput( - sizePerPartition); - DataOutputBuffer dout = new DataOutputBuffer(); - stats.serialize(dout); - ByteString partitionStatsBytes = - TezCommonUtils.compressByteArrayToByteString(dout.getData()); - vmBuilder.setPartitionStats(partitionStatsBytes); - } - - VertexManagerEvent vmEvent = VertexManagerEvent.create( - outputContext.getDestinationVertexName(), - vmBuilder.build().toByteString().asReadOnlyByteBuffer()); - return vmEvent; + private Event generateVMEvent() throws IOException { + return ShuffleUtils.generateVMEvent(outputContext, this.sizePerPartition, + this.reportDetailedPartitionStats()); } private Event generateDMEvent() throws IOException { @@ -667,7 +656,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit if (currentBuffer.nextPosition == 0) { if (pipelinedShuffle) { List<Event> eventList = Lists.newLinkedList(); - eventList.add(generateVMEvent(new long[numPartitions])); + eventList.add(ShuffleUtils.generateVMEvent(outputContext, + reportPartitionStats() ? new long[numPartitions] : null, + reportDetailedPartitionStats())); //Send final event with all empty partitions and null path component. BitSet emptyPartitions = new BitSet(numPartitions); emptyPartitions.flip(0, numPartitions); @@ -844,7 +835,9 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit writer.append(key, value); outputLargeRecordsCounter.increment(1); numRecordsPerPartition[i]++; - sizePerPartition[i] += writer.getRawLength(); + if (reportPartitionStats()) { + sizePerPartition[i] += writer.getRawLength(); + } writer.close(); additionalSpillBytesWritternCounter.increment(writer.getCompressedLength()); TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(), @@ -985,7 +978,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit try { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber); if (isFinalUpdate) { - eventList.add(generateVMEvent(sizePerPartition)); + eventList.add(ShuffleUtils.generateVMEvent(outputContext, + sizePerPartition, reportDetailedPartitionStats())); } Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate, pathComponent, emptyPartitions); http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index c0b0760..9a3d778 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -200,7 +200,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), - sorter.getPartitionStats()); + sorter.getPartitionStats(), sorter.reportDetailedPartitionStats()); } return eventList; } http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java index 879c2e0..4f74f7d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java @@ -172,6 +172,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java index 90c0ed4..c4b3b22 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java @@ -145,6 +145,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH); confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS); } // TODO Maybe add helper methods to extract keys http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/main/proto/ShufflePayloads.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto index 9b0fc16..f78cbac 100644 --- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto +++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto @@ -41,9 +41,16 @@ message InputInformationEventPayloadProto { optional int32 partition_range = 1; } +// DetailedPartitionStatsProto represents size of a list of partitions. +// It is more accurate than the partition_stats. +message DetailedPartitionStatsProto { + repeated int32 size_in_mb = 1; +} + message VertexManagerEventPayloadProto { optional int64 output_size = 1; optional bytes partition_stats = 2; + optional DetailedPartitionStatsProto detailed_partition_stats = 3; } message ShuffleEdgeManagerConfigPayloadProto { http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index c542030..4233f5d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -161,8 +161,9 @@ public class TestShuffleUtils { int spillId = 0; int physicalOutputs = 10; String pathComponent = "/attempt_x_y_0/file.out"; - ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); + ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, + outputContext, spillId, new TezSpillRecord(indexFile, conf), + physicalOutputs, true, pathComponent, null, false); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -199,8 +200,9 @@ public class TestShuffleUtils { String pathComponent = "/attempt_x_y_0/file.out"; //normal code path where we do final merge all the time - ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); + ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, + outputContext, spillId, new TezSpillRecord(indexFile, conf), + physicalOutputs, true, pathComponent, null, false); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@ -239,8 +241,9 @@ public class TestShuffleUtils { String pathComponent = "/attempt_x_y_0/file.out"; //normal code path where we do final merge all the time - ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); + ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, + outputContext, spillId, new TezSpillRecord(indexFile, conf), + physicalOutputs, true, pathComponent, null, false); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 70819e5..80e7b14 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -40,6 +40,7 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -404,10 +405,11 @@ public class TestPipelinedSorter { writeData(sorter, numKeys, keySize); //partition stats; - boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration - .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration - .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT); - if (partitionStats) { + ReportPartitionStats partitionStats = + ReportPartitionStats.fromString(conf.get( + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); + if (partitionStats.isEnabled()) { assertTrue(sorter.getPartitionStats() != null); } http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 9d2b615..41b2b97 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; @@ -115,14 +116,28 @@ public class TestUnorderedPartitionedKVWriter { private static FileSystem localFs; private boolean shouldCompress; + private ReportPartitionStats reportPartitionStats; - public TestUnorderedPartitionedKVWriter(boolean shouldCompress) { + public TestUnorderedPartitionedKVWriter(boolean shouldCompress, + ReportPartitionStats reportPartitionStats) { this.shouldCompress = shouldCompress; + this.reportPartitionStats = reportPartitionStats; } - @Parameters + @SuppressWarnings("deprecation") + @Parameterized.Parameters(name = "test[{0}, {1}, {2}]") public static Collection<Object[]> data() { - Object[][] data = new Object[][] { { false }, { true } }; + Object[][] data = new Object[][] { + { false, ReportPartitionStats.DISABLED }, + { false, ReportPartitionStats.ENABLED }, + { false, ReportPartitionStats.NONE }, + { false, ReportPartitionStats.MEMORY_OPTIMIZED }, + { false, ReportPartitionStats.PRECISE }, + { true, ReportPartitionStats.DISABLED }, + { true, ReportPartitionStats.ENABLED }, + { true, ReportPartitionStats.NONE }, + { true, ReportPartitionStats.MEMORY_OPTIMIZED }, + { true, ReportPartitionStats.PRECISE }}; return Arrays.asList(data); } @@ -415,36 +430,54 @@ public class TestUnorderedPartitionedKVWriter { assertEquals(0, expectedValues.size()); } - private long[] getPartitionStats( - VertexManagerEvent vme) throws IOException { + private int[] getPartitionStats(VertexManagerEvent vme) throws IOException { RoaringBitmap partitionStats = new RoaringBitmap(); ShuffleUserPayloads.VertexManagerEventPayloadProto payload = ShuffleUserPayloads.VertexManagerEventPayloadProto .parseFrom(ByteString.copyFrom(vme.getUserPayload())); - assertTrue(payload.hasPartitionStats()); - ByteString compressedPartitionStats = payload.getPartitionStats(); - byte[] rawData = TezCommonUtils.decompressByteStringToByteArray( - compressedPartitionStats); - ByteArrayInputStream bin = new ByteArrayInputStream(rawData); - partitionStats.deserialize(new DataInputStream(bin)); - long[] stats = new long[partitionStats.getCardinality()]; - Iterator<Integer> it = partitionStats.iterator(); - final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values(); - final int RANGE_LEN = RANGES.length; - while (it.hasNext()) { - int pos = it.next(); - int index = ((pos) / RANGE_LEN); - int rangeIndex = ((pos) % RANGE_LEN); - if (RANGES[rangeIndex].getSizeInMB() > 0) { - stats[index] += RANGES[rangeIndex].getSizeInMB(); + if (!reportPartitionStats.isEnabled()) { + assertFalse(payload.hasPartitionStats()); + assertFalse(payload.hasDetailedPartitionStats()); + return null; + } + if (reportPartitionStats.isPrecise()) { + assertTrue(payload.hasDetailedPartitionStats()); + List<Integer> sizeInMBList = + payload.getDetailedPartitionStats().getSizeInMbList(); + int[] stats = new int[sizeInMBList.size()]; + for (int i=0; i<sizeInMBList.size(); i++) { + stats[i] += sizeInMBList.get(i); } + return stats; + } else { + assertTrue(payload.hasPartitionStats()); + ByteString compressedPartitionStats = payload.getPartitionStats(); + byte[] rawData = TezCommonUtils.decompressByteStringToByteArray( + compressedPartitionStats); + ByteArrayInputStream bin = new ByteArrayInputStream(rawData); + partitionStats.deserialize(new DataInputStream(bin)); + int[] stats = new int[partitionStats.getCardinality()]; + Iterator<Integer> it = partitionStats.iterator(); + final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values(); + final int RANGE_LEN = RANGES.length; + while (it.hasNext()) { + int pos = it.next(); + int index = ((pos) / RANGE_LEN); + int rangeIndex = ((pos) % RANGE_LEN); + if (RANGES[rangeIndex].getSizeInMB() > 0) { + stats[index] += RANGES[rangeIndex].getSizeInMB(); + } + } + return stats; } - return stats; } private void verifyPartitionStats(VertexManagerEvent vme, BitSet expectedPartitionsWithData) throws IOException { - long[] stats = getPartitionStats(vme); + int[] stats = getPartitionStats(vme); + if (stats == null) { + return; + } for (int i = 0; i < stats.length; i++) { // The stats should be greater than zero if and only if // the partition has data @@ -922,6 +955,8 @@ public class TestUnorderedPartitionedKVWriter { conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, DefaultCodec.class.getName()); } + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + reportPartitionStats.getType()); return conf; } http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java index fabf52d..9d6ca50 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.junit.Test; public class TestOrderedPartitionedKVEdgeConfig { @@ -132,7 +133,8 @@ public class TestOrderedPartitionedKVEdgeConfig { additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f"); additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter"); - additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true"); + additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + ReportPartitionStats.MEMORY_OPTIMIZED.getType()); additionalConfs.put("file.shouldExist", "file"); OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig @@ -173,9 +175,11 @@ public class TestOrderedPartitionedKVEdgeConfig { assertEquals("io", outputConf.get("io.shouldExist")); assertEquals("file", outputConf.get("file.shouldExist")); assertEquals("fs", outputConf.get("fs.shouldExist")); - assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration - .TEZ_RUNTIME_REPORT_PARTITION_STATS, + ReportPartitionStats partitionStats = + ReportPartitionStats.fromString(outputConf.get( + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); + assertEquals(true, partitionStats.isEnabled()); assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); http://git-wip-us.apache.org/repos/asf/tez/blob/80ba12b2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 8942f4b..93c4f92 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -38,6 +38,7 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.SorterImpl; @@ -98,6 +99,7 @@ public class TestOnFileSortedOutput { private boolean sendEmptyPartitionViaEvent; //Partition index for which data should not be written to. private int emptyPartitionIdx; + private ReportPartitionStats reportPartitionStats; /** * Constructor @@ -107,13 +109,14 @@ public class TestOnFileSortedOutput { * @param sorterThreads number of threads needed for sorter (required only for pipelined sorter) * @param emptyPartitionIdx for which data should not be generated */ - public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, SorterImpl sorterImpl, - int sorterThreads, int emptyPartitionIdx) throws IOException { + public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, + SorterImpl sorterImpl, int sorterThreads, int emptyPartitionIdx, + ReportPartitionStats reportPartitionStats) throws IOException { this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent; this.emptyPartitionIdx = emptyPartitionIdx; this.sorterImpl = sorterImpl; this.sorterThreads = sorterThreads; - + this.reportPartitionStats = reportPartitionStats; conf = new Configuration(); workingDir = new Path(".", this.getClass().getName()); @@ -135,7 +138,8 @@ public class TestOnFileSortedOutput { conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED, sendEmptyPartitionViaEvent); - + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + reportPartitionStats.getType()); outputSize.set(0); numRecords.set(0); fs.mkdirs(workingDir); @@ -147,27 +151,39 @@ public class TestOnFileSortedOutput { fs.delete(workingDir, true); } - @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}]") + @SuppressWarnings("deprecation") + @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3}, {4}]") public static Collection<Object[]> getParameters() { Collection<Object[]> parameters = new ArrayList<Object[]>(); - //empty_partition_via_events_enabled, noOfSortThreads, partitionToBeEmpty - parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1 }); - parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0 }); - parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1 }); - parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0 }); + //empty_partition_via_events_enabled, noOfSortThreads, + // partitionToBeEmpty, reportPartitionStats + parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, -1, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { false, SorterImpl.LEGACY, 1, 0, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, -1, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.LEGACY, 1, 0, + ReportPartitionStats.PRECISE }); //Pipelined sorter - parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1 }); - parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0 }); - parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1 }); - parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0 }); - + parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, -1, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { false, SorterImpl.PIPELINED, 2, 0, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, -1, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0, + ReportPartitionStats.ENABLED }); + parameters.add(new Object[] { true, SorterImpl.PIPELINED, 2, 0, + ReportPartitionStats.PRECISE }); return parameters; } private void startSortedOutput(int partitions) throws Exception { OutputContext context = createTezOutputContext(); - conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); doReturn(payLoad).when(context).getUserPayload(); @@ -299,7 +315,11 @@ public class TestOnFileSortedOutput { .parseFrom( ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload())); - assertTrue(vmPayload.hasPartitionStats()); + if (reportPartitionStats.isPrecise()) { + assertTrue(vmPayload.hasDetailedPartitionStats()); + } else { + assertTrue(vmPayload.hasPartitionStats()); + } assertEquals(HOST, payload.getHost()); assertEquals(PORT, payload.getPort()); assertEquals(UniqueID, payload.getPathComponent());
