Repository: tez Updated Branches: refs/heads/branch-0.7 3e30e9498 -> 62e4b6ea1
TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source. (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/62e4b6ea Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/62e4b6ea Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/62e4b6ea Branch: refs/heads/branch-0.7 Commit: 62e4b6ea1cd4021791ae3b133e8e80c9a82456d6 Parents: 3e30e94 Author: Rajesh Balamohan <[email protected]> Authored: Mon Dec 14 07:50:22 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Mon Dec 14 07:50:22 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + pom.xml | 5 + tez-runtime-library/findbugs-exclude.xml | 9 + tez-runtime-library/pom.xml | 4 + .../vertexmanager/ShuffleVertexManager.java | 178 ++++++++++++++++--- .../library/api/TezRuntimeConfiguration.java | 9 + .../library/common/shuffle/ShuffleUtils.java | 37 +++- .../common/sort/impl/ExternalSorter.java | 27 +++ .../common/sort/impl/PipelinedSorter.java | 16 +- .../common/sort/impl/dflt/DefaultSorter.java | 23 ++- .../output/OrderedPartitionedKVOutput.java | 12 +- .../runtime/library/utils/DATA_RANGE_IN_MB.java | 49 +++++ .../src/main/proto/ShufflePayloads.proto | 1 + .../vertexmanager/TestShuffleVertexManager.java | 150 +++++++++++++++- .../common/shuffle/TestShuffleUtils.java | 6 +- .../common/sort/impl/TestPipelinedSorter.java | 17 ++ .../sort/impl/dflt/TestDefaultSorter.java | 34 ++++ .../TestOrderedPartitionedKVEdgeConfig.java | 4 + .../library/output/TestOnFileSortedOutput.java | 8 + 19 files changed, 546 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 60ea352..023be60 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES + TEZ-2496. Consider scheduling tasks in ShuffleVertexManager based on the partition sizes from the source. TEZ-2995. Timeline primary filter should only be on callerId and not type. TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 169353f..bc8f487 100644 --- a/pom.xml +++ b/pom.xml @@ -181,6 +181,11 @@ <version>3.1.0</version> </dependency> <dependency> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + <version>0.4.9</version> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml index 45c194c..6b9e851 100644 --- a/tez-runtime-library/findbugs-exclude.xml +++ b/tez-runtime-library/findbugs-exclude.xml @@ -99,4 +99,13 @@ </Match> + <Match> + <Bug pattern="EI_EXPOSE_REP"/> + <Or> + <Class name="org.apache.tez.runtime.library.common.sort.impl.ExteralSorter" /> + <Method name="getPartitionStats"/> + <Field name="partitionStats"/> + </Or> + </Match> + </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/pom.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml index 9230576..a68ee88 100644 --- a/tez-runtime-library/pom.xml +++ b/tez-runtime-library/pom.xml @@ -26,6 +26,10 @@ <dependencies> <dependency> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + </dependency> + <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 01dc5a0..1950df2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -28,6 +28,9 @@ import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -59,12 +62,17 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexMan import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -136,7 +144,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int totalNumBipartiteSourceTasks = 0; int numBipartiteSourceTasksCompleted = 0; int numVertexManagerEventsReceived = 0; - List<Integer> pendingTasks = Lists.newLinkedList(); + List<PendingTaskInfo> pendingTasks = Lists.newLinkedList(); int totalTasksToSchedule = 0; private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false); @@ -150,6 +158,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { long completedSourceTasksOutputSize = 0; List<VertexStateUpdate> pendingStateUpdates = Lists.newArrayList(); + private int[][] targetIndexes; + private int basePartitionRange; + private int remainderRangeForLastShuffler; + @VisibleForTesting + long[] stats; //approximate amount of data to be fetched + static class SourceVertexInfo { EdgeProperty edgeProperty; boolean vertexIsConfigured; @@ -172,10 +186,32 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } } + static class PendingTaskInfo { + private int index; + private long outputStats; + + public PendingTaskInfo(int index) { + this.index = index; + } + + public String toString() { + return "[index=" + index + ", outputStats=" + outputStats + "]"; + } + } + public ShuffleVertexManager(VertexManagerPluginContext context) { super(context); } + static int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) { + int startIndex = taskIndex * offSetPerTask; + int[] indices = new int[partitionRange]; + for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) { + indices[currentIndex] = (startIndex + currentIndex); + } + return indices; + } + public static class CustomShuffleEdgeManager extends EdgeManagerPluginOnDemand { int numSourceTaskOutputs; int numDestinationTasks; @@ -242,7 +278,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } else { partitionRange = remainderRangeForLastShuffler; } - + // all inputs from a source task are next to each other in original order int targetIndex = sourceTaskIndex * partitionRange @@ -273,14 +309,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { return EventRouteMetadata.create(1, new int[]{targetIndex}); } - private int[] createIndices(int partitionRange, int taskIndex, int offSetPerTask) { - int startIndex = taskIndex * offSetPerTask; - int[] indices = new int[partitionRange]; - for (int currentIndex = 0; currentIndex < partitionRange; ++currentIndex) { - indices[currentIndex] = (startIndex + currentIndex); - } - return indices; - } + @Override public void prepareForRouting() throws Exception { @@ -494,6 +523,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { schedulePendingTasks(); } + @Override public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName(); @@ -516,7 +546,25 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } schedulePendingTasks(); } - + + @VisibleForTesting + void parsePartitionStats(RoaringBitmap partitionStats) { + Preconditions.checkState(stats != null, "Stats should be initialized"); + Iterator<Integer> it = partitionStats.iterator(); + final DATA_RANGE_IN_MB[] RANGES = DATA_RANGE_IN_MB.values(); + final int RANGE_LEN = RANGES.length; + while (it.hasNext()) { + int pos = it.next(); + int index = ((pos) / RANGE_LEN); + int rangeIndex = ((pos) % RANGE_LEN); + //Add to aggregated stats and normalize to DATA_RANGE_IN_MB. + if (RANGES[rangeIndex].getSizeInMB() > 0) { + stats[index] += RANGES[rangeIndex].getSizeInMB(); + } + } + } + + @Override public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { // currently events from multiple attempts of the same task can be ignored because @@ -544,6 +592,19 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } sourceTaskOutputSize = proto.getOutputSize(); + if (proto.hasPartitionStats()) { + try { + RoaringBitmap partitionStats = new RoaringBitmap(); + ByteString compressedPartitionStats = proto.getPartitionStats(); + byte[] rawData = TezCommonUtils.decompressByteStringToByteArray(compressedPartitionStats); + ByteArrayInputStream bin = new ByteArrayInputStream(rawData); + partitionStats.deserialize(new DataInputStream(bin)); + + parsePartitionStats(partitionStats); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + } srcInfo.numVMEventsReceived++; srcInfo.outputSize += sourceTaskOutputSize; completedSourceTasksOutputSize += sourceTaskOutputSize; @@ -558,13 +619,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + " total output size: " + completedSourceTasksOutputSize); } } - + + void updatePendingTasks() { pendingTasks.clear(); - for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) { - pendingTasks.add(i); + for (int i = 0; i < getContext().getVertexNumTasks(getContext().getVertexName()); ++i) { + pendingTasks.add(new PendingTaskInfo(i)); } totalTasksToSchedule = pendingTasks.size(); + if (stats == null) { + stats = new long[totalTasksToSchedule]; // TODO lost previous data + } } Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() { @@ -633,7 +698,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } // most shufflers will be assigned this range - int basePartitionRange = currentParallelism/desiredTaskParallelism; + basePartitionRange = currentParallelism/desiredTaskParallelism; if (basePartitionRange <= 1) { // nothing to do if range is equal 1 partition. shuffler does it by default @@ -641,7 +706,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } int numShufflersWithBaseRange = currentParallelism / basePartitionRange; - int remainderRangeForLastShuffler = currentParallelism % basePartitionRange; + remainderRangeForLastShuffler = currentParallelism % basePartitionRange; int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ? (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange); @@ -678,14 +743,30 @@ public class ShuffleVertexManager extends VertexManagerPlugin { oldEdgeProp.getEdgeSource(), oldEdgeProp.getEdgeDestination()); edgeProperties.put(vertex, newEdgeProp); } - + getContext().reconfigureVertex(finalTaskParallelism, null, edgeProperties); - updatePendingTasks(); + configureTargetMapping(finalTaskParallelism); } return true; } + void configureTargetMapping(int tasks) { + targetIndexes = new int[tasks][]; + for (int idx = 0; idx < tasks; ++idx) { + int partitionRange = basePartitionRange; + if (idx == (tasks - 1)) { + partitionRange = ((remainderRangeForLastShuffler > 0) + ? remainderRangeForLastShuffler : basePartitionRange); + } + // skip the basePartitionRange per destination task + targetIndexes[idx] = createIndices(partitionRange, idx, basePartitionRange); + if (LOG.isDebugEnabled()) { + LOG.debug("targetIdx[" + idx + "] to " + Arrays.toString(targetIndexes[idx])); + } + } + } + void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) { // determine parallelism before scheduling the first time // this is the latest we can wait before determining parallelism. @@ -702,11 +783,15 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } getContext().doneReconfiguringVertex(); } + if (totalNumBipartiteSourceTasks > 0) { + //Sort in case partition stats are available + sortPendingTasksBasedOnDataSize(); + } List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasksToSchedule); while(!pendingTasks.isEmpty() && numTasksToSchedule > 0) { numTasksToSchedule--; - Integer taskIndex = pendingTasks.get(0); + Integer taskIndex = pendingTasks.get(0).index; scheduledTasks.add(ScheduleTaskRequest.create(taskIndex, null)); pendingTasks.remove(0); } @@ -718,6 +803,59 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } } + private void sortPendingTasksBasedOnDataSize() { + //Get partition sizes from all source vertices + boolean statsUpdated = computePartitionSizes(); + + if (statsUpdated) { + //Order the pending tasks based on task size in reverse order + Collections.sort(pendingTasks, new Comparator<PendingTaskInfo>() { + @Override + public int compare(PendingTaskInfo left, PendingTaskInfo right) { + return (left.outputStats > right.outputStats) ? -1 : + ((left.outputStats == right.outputStats) ? 0 : 1); + } + }); + + if (LOG.isDebugEnabled()) { + for (PendingTaskInfo pendingTask : pendingTasks) { + LOG.debug("Pending task:" + pendingTask.toString()); + } + } + } + } + + /** + * Compute partition sizes in case statistics are available in vertex. + * + * @return boolean indicating whether stats are computed + */ + private synchronized boolean computePartitionSizes() { + boolean computedPartitionSizes = false; + for (PendingTaskInfo taskInfo : pendingTasks) { + int index = taskInfo.index; + if (targetIndexes != null) { //parallelism has changed. + Preconditions.checkState(index < targetIndexes.length, + "index=" + index +", targetIndexes length=" + targetIndexes.length); + int[] mapping = targetIndexes[index]; + long totalStats = 0; + for (int i : mapping) { + totalStats += stats[i]; + } + if ((totalStats > 0) && (taskInfo.outputStats != totalStats)) { + computedPartitionSizes = true; + taskInfo.outputStats = totalStats; + } + } else { + if ((stats[index] > 0) && (stats[index] != taskInfo.outputStats)) { + computedPartitionSizes = true; + taskInfo.outputStats = stats[index]; + } + } + } + return computedPartitionSizes; + } + /** * Verify whether each of the source vertices have completed at least 1 task * @@ -865,7 +1003,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + slowStartMaxSrcCompletionFraction + " auto:" + enableAutoParallelism + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:" + minTaskParallelism); - + if (enableAutoParallelism) { getContext().vertexReconfigurationPlanned(); } http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index 3cfbf8e..440c9f4 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -134,6 +134,14 @@ public class TezRuntimeConfiguration { public static final int TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS_DEFAULT = 2; /** + * Report partition statistics (e.g better scheduling in ShuffleVertexManager). TEZ-2496 + * This can be enabled/disabled at vertex level. + */ + public static final String TEZ_RUNTIME_REPORT_PARTITION_STATS = TEZ_RUNTIME_PREFIX + + "report.partition.stats"; + public static final boolean TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT = true; + + /** * Size of the buffer to use if not writing directly to disk. */ public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB = TEZ_RUNTIME_PREFIX + @@ -457,6 +465,7 @@ public class TezRuntimeConfiguration { tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION); tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION); + tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS); tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT); tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS); tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 1873485..8aca3af 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -29,11 +29,15 @@ import java.text.DecimalFormat; import java.util.BitSet; import java.util.List; +import javax.annotation.Nullable; import javax.crypto.SecretKey; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB; +import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -406,12 +410,13 @@ public class ShuffleUtils { * @param spillRecord * @param numPhysicalOutputs * @param pathComponent + * @param partitionStats * @throws IOException */ public static void generateEventOnSpill(List<Event> eventList, boolean finalMergeEnabled, boolean isLastEvent, OutputContext context, int spillId, TezSpillRecord spillRecord, - int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent) - throws IOException { + int numPhysicalOutputs, boolean sendEmptyPartitionDetails, String pathComponent, + @Nullable long[] partitionStats) throws IOException { Preconditions.checkArgument(eventList != null, "EventList can't be null"); if (finalMergeEnabled) { @@ -439,6 +444,16 @@ public class ShuffleUtils { // up adding up to final outputsize. This is needed for auto-reduce parallelism to work // properly. vmBuilder.setOutputSize(outputSize); + + //set partition stats + if (partitionStats != null && partitionStats.length > 0) { + RoaringBitmap stats = getPartitionStatsForPhysicalOutput(partitionStats); + DataOutputBuffer dout = new DataOutputBuffer(); + stats.serialize(dout); + ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); + vmBuilder.setPartitionStats(partitionStatsBytes); + } + VertexManagerEvent vmEvent = VertexManagerEvent.create( context.getDestinationVertexName(), vmBuilder.build().toByteString().asReadOnlyByteBuffer()); eventList.add(vmEvent); @@ -450,6 +465,24 @@ public class ShuffleUtils { eventList.add(csdme); } + /** + * Data size for the destinations + * + * @param sizes for physical outputs + */ + public static RoaringBitmap getPartitionStatsForPhysicalOutput(long[] sizes) { + RoaringBitmap partitionStats = new RoaringBitmap(); + if (sizes == null || sizes.length == 0) { + return partitionStats; + } + final int RANGE_LEN = DATA_RANGE_IN_MB.values().length; + for (int i = 0; i < sizes.length; i++) { + int bucket = DATA_RANGE_IN_MB.getRange(sizes[i]).ordinal(); + int index = i * (RANGE_LEN); + partitionStats.add(index + bucket); + } + return partitionStats; + } /** http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index aba04e0..ac5acb8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -26,6 +26,7 @@ import java.util.Map; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; +import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -69,6 +70,7 @@ public abstract class ExternalSorter { public void close() throws IOException { spillFileIndexPaths.clear(); spillFilePaths.clear(); + reportStatistics(); } public abstract void flush() throws IOException; @@ -116,6 +118,8 @@ public abstract class ExternalSorter { protected Path finalIndexFile; protected int numSpills; + protected OutputStatisticsReporter statsReporter; + protected final long[] partitionStats; protected final boolean finalMergeEnabled; protected final boolean sendEmptyPartitionDetails; @@ -152,6 +156,10 @@ public abstract class ExternalSorter { this.outputContext = outputContext; this.conf = conf; this.partitions = numOutputs; + boolean reportPartitionStats = conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT); + this.partitionStats = (reportPartitionStats) ? (new long[partitions]) : null; rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw(); @@ -241,6 +249,8 @@ public abstract class ExternalSorter { this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, this.partitions); this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf); this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext); + + this.statsReporter = outputContext.getStatisticsReporter(); this.finalMergeEnabled = conf.getBoolean( TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT_DEFAULT); @@ -339,4 +349,21 @@ public abstract class ExternalSorter { public int getNumSpills() { return numSpills; } + + public long[] getPartitionStats() { + return partitionStats; + } + + protected boolean reportPartitionStats() { + return (partitionStats != null); + } + + protected synchronized void reportStatistics() { + // This works for non-started outputs since new counters will be created with an initial value of 0 + long outputSize = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); + statsReporter.reportDataSize(outputSize); + long outputRecords = outputContext.getCounters() + .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); + statsReporter.reportItemsProcessed(outputRecords); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 049087b..9708d7c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -271,7 +271,7 @@ public class PipelinedSorter extends ExternalSorter { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + (numSpills-1)); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), false, outputContext, (numSpills - 1), indexCacheList.get(numSpills - 1), partitions, sendEmptyPartitionDetails, - pathComponent); + pathComponent, partitionStats); outputContext.sendEvents(events); LOG.info(outputContext.getDestinationVertexName() + ": Added spill event for spill (final update=false), spillId=" + (numSpills - 1)); @@ -476,6 +476,9 @@ public class PipelinedSorter extends ExternalSorter { writer.getRawLength(), writer.getCompressedLength()); spillRec.putIndex(rec, i); + if (!isFinalMergeEnabled() && reportPartitionStats()) { + partitionStats[i] += writer.getCompressedLength(); + } } Path indexFilename = @@ -538,7 +541,7 @@ public class PipelinedSorter extends ExternalSorter { String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, outputContext, i, indexCacheList.get(i), partitions, - sendEmptyPartitionDetails, pathComponent); + sendEmptyPartitionDetails, pathComponent, partitionStats); LOG.info(outputContext.getDestinationVertexName() + ": Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i); } outputContext.sendEvents(events); @@ -564,6 +567,12 @@ public class PipelinedSorter extends ExternalSorter { + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" + indexFilename); } + TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, conf); + if (reportPartitionStats()) { + for (int i = 0; i < spillRecord.size(); i++) { + partitionStats[i] += spillRecord.getIndex(i).getPartLength(); + } + } numShuffleChunks.setValue(numSpills); fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen()); return; @@ -638,6 +647,9 @@ public class PipelinedSorter extends ExternalSorter { writer.getRawLength(), writer.getCompressedLength()); spillRec.putIndex(rec, parts); + if (reportPartitionStats()) { + partitionStats[parts] += writer.getCompressedLength(); + } } numShuffleChunks.setValue(1); //final merge has happened. http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index ac90112..6c15a5d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -699,8 +699,6 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab } } - @Override - public void close() throws IOException { } protected class SpillThread extends Thread { @@ -883,6 +881,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab writer.getRawLength(), writer.getCompressedLength()); spillRec.putIndex(rec, i); + if (!isFinalMergeEnabled() && reportPartitionStats()) { + partitionStats[i] += writer.getCompressedLength(); + } writer = null; } finally { @@ -1079,7 +1080,8 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab String pathComponent = (outputContext.getUniqueIdentifier() + "_" + index); ShuffleUtils.generateEventOnSpill(events, isFinalMergeEnabled(), isLastEvent, - outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent); + outputContext, index, spillRecord, partitions, sendEmptyPartitionDetails, pathComponent, + partitionStats); LOG.info(outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index); @@ -1127,19 +1129,22 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output + TezSpillRecord spillRecord = null; if (isFinalMergeEnabled()) { finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]); finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]); sameVolRename(filename[0], finalOutputFile); if (indexCacheList.size() == 0) { sameVolRename(spillFileIndexPaths.get(0), finalIndexFile); + spillRecord = new TezSpillRecord(finalIndexFile, conf); } else { - indexCacheList.get(0).writeToFile(finalIndexFile, conf); + spillRecord = indexCacheList.get(0); + spillRecord.writeToFile(finalIndexFile, conf); } } else { List<Event> events = Lists.newLinkedList(); //Since there is only one spill, spill record would be present in cache. - TezSpillRecord spillRecord = indexCacheList.get(0); + spillRecord = indexCacheList.get(0); Path indexPath = mapOutputFile.getSpillIndexFileForWrite(numSpills-1, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRecord.writeToFile(indexPath, conf); @@ -1147,6 +1152,11 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab fileOutputByteCounter.increment(rfs.getFileStatus(spillFilePaths.get(0)).getLen()); //No need to populate finalIndexFile, finalOutputFile etc when finalMerge is disabled } + if (spillRecord != null && reportPartitionStats()) { + for(int i=0; i < spillRecord.size(); i++) { + partitionStats[i] += spillRecord.getIndex(i).getPartLength(); + } + } numShuffleChunks.setValue(numSpills); return; } @@ -1276,6 +1286,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab writer.getRawLength(), writer.getCompressedLength()); spillRec.putIndex(rec, parts); + if (reportPartitionStats()) { + partitionStats[parts] += writer.getCompressedLength(); + } } numShuffleChunks.setValue(1); //final merge has happened spillRec.writeToFile(finalIndexFile, conf); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java index 2b4c0f4..e418c1b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java @@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezUtils; -import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; @@ -191,13 +190,6 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { returnEvents = generateEmptyEvents(); } - // This works for non-started outputs since new counters will be created with an initial value of 0 - long outputSize = getContext().getCounters().findCounter(TaskCounter.OUTPUT_BYTES).getValue(); - getContext().getStatisticsReporter().reportDataSize(outputSize); - long outputRecords = getContext().getCounters() - .findCounter(TaskCounter.OUTPUT_RECORDS).getValue(); - getContext().getStatisticsReporter().reportItemsProcessed(outputRecords); - return returnEvents; } @@ -207,7 +199,8 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { boolean isLastEvent = true; ShuffleUtils.generateEventOnSpill(eventList, finalMergeEnabled, isLastEvent, getContext(), 0, new TezSpillRecord(sorter.getFinalIndexFile(), conf), - getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier()); + getNumPhysicalOutputs(), sendEmptyPartitionDetails, getContext().getUniqueIdentifier(), + sorter.getPartitionStats()); } return eventList; } @@ -228,6 +221,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput { confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_PERCENT); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB); + confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS); confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java new file mode 100644 index 0000000..126f04e --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/DATA_RANGE_IN_MB.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.utils; + +import org.apache.commons.math3.util.FastMath; + +public enum DATA_RANGE_IN_MB { + THOUSAND(1000), HUNDRED(100), TEN(10), ONE(1), ZERO(0); + + private final int sizeInMB; + + private DATA_RANGE_IN_MB(int sizeInMB) { + this.sizeInMB = sizeInMB; + } + + public final int getSizeInMB() { + return sizeInMB; + } + + static long ceil(long a, long b) { + return (a + (b - 1)) / b; + } + + public static final DATA_RANGE_IN_MB getRange(long sizeInBytes) { + long sizeInMB = ceil(sizeInBytes, (1024l * 1024l)); + for (DATA_RANGE_IN_MB range : values()) { + if (sizeInMB >= range.sizeInMB) { + return range; + } + } + return ZERO; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/main/proto/ShufflePayloads.proto ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto index f7b482d..9b0fc16 100644 --- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto +++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto @@ -43,6 +43,7 @@ message InputInformationEventPayloadProto { message VertexManagerEventPayloadProto { optional int64 output_size = 1; + optional bytes partition_stats = 2; } message ShuffleEdgeManagerConfigPayloadProto { http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 9a9ff27..9d53ebc 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -18,10 +18,14 @@ package org.apache.tez.dag.library.vertexmanager; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeManagerPlugin; import org.apache.tez.dag.api.EdgeManagerPluginContext; @@ -48,11 +52,13 @@ import org.apache.tez.runtime.api.VertexIdentifier; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.roaringbitmap.RoaringBitmap; import java.io.IOException; import java.nio.ByteBuffer; @@ -301,6 +307,43 @@ public class TestShuffleVertexManager { Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize); /** + * Test partition stats + */ + scheduledTasks.clear(); + //{5,9,12,18} in bitmap + long[] sizes = new long[]{(0l), (1000l * 1000l), + (1010 * 1000l * 1000l), (50 * 1000l * 1000l)}; + vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex"); + + manager = createManager(conf, mockContext, 0.01f, 0.75f); + manager.onVertexStarted(emptyCompletions); + Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); //0 MB bucket + Assert.assertEquals(1, manager.stats[1]); //1 MB bucket + Assert.assertEquals(100, manager.stats[2]); //100 MB bucket + Assert.assertEquals(10, manager.stats[3]); //10 MB bucket + + // sending again from a different version of the same task has not impact + TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); //0 MB bucket + Assert.assertEquals(1, manager.stats[1]); //1 MB bucket + Assert.assertEquals(100, manager.stats[2]); //100 MB bucket + Assert.assertEquals(10, manager.stats[3]); //10 MB bucket + + /** * Test for TEZ-978 * Delay determining parallelism until enough data has been received. */ @@ -511,6 +554,7 @@ public class TestShuffleVertexManager { String mockManagedVertexId = "Vertex4"; VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); + when(mockContext.getVertexStatistics(any(String.class))).thenReturn(mock(VertexStatistics.class)); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); @@ -631,7 +675,7 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); // source vertex have some tasks. min, max == 0 - manager = createManager(conf, mockContext, 0.f, 0.f); + manager = createManager(conf, mockContext, 0.0f, 0.0f); manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.totalTasksToSchedule == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); @@ -926,7 +970,6 @@ public class TestShuffleVertexManager { throws IOException { ByteBuffer payload = null; if (sizes != null) { - /* RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); DataOutputBuffer dout = new DataOutputBuffer(); partitionStats.serialize(dout); @@ -938,7 +981,6 @@ public class TestShuffleVertexManager { .setPartitionStats(partitionStatsBytes) .build().toByteString() .asReadOnlyByteBuffer(); - */ } else { payload = VertexManagerEventPayloadProto.newBuilder() @@ -954,6 +996,108 @@ public class TestShuffleVertexManager { } @Test(timeout = 5000) + public void testSchedulingWithPartitionStats() throws IOException { + Configuration conf = new Configuration(); + conf.setBoolean( + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, + true); + conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, 1000L); + ShuffleVertexManager manager = null; + + HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>(); + String r1 = "R1"; + EdgeProperty eProp1 = EdgeProperty.create( + EdgeProperty.DataMovementType.SCATTER_GATHER, + EdgeProperty.DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("out"), + InputDescriptor.create("in")); + String m2 = "M2"; + EdgeProperty eProp2 = EdgeProperty.create( + EdgeProperty.DataMovementType.BROADCAST, + EdgeProperty.DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("out"), + InputDescriptor.create("in")); + String m3 = "M3"; + EdgeProperty eProp3 = EdgeProperty.create( + EdgeProperty.DataMovementType.BROADCAST, + EdgeProperty.DataSourceType.PERSISTED, + SchedulingType.SEQUENTIAL, + OutputDescriptor.create("out"), + InputDescriptor.create("in")); + + final String mockManagedVertexId = "R2"; + + mockInputVertices.put(r1, eProp1); + mockInputVertices.put(m2, eProp2); + mockInputVertices.put(m3, eProp3); + + VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); + when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); + when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); + when(mockContext.getVertexNumTasks(r1)).thenReturn(3); + when(mockContext.getVertexNumTasks(m2)).thenReturn(3); + when(mockContext.getVertexNumTasks(m3)).thenReturn(3); + + final List<Integer> scheduledTasks = Lists.newLinkedList(); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + Object[] args = invocation.getArguments(); + scheduledTasks.clear(); + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { + scheduledTasks.add(task.getTaskIndex()); + } + return null; + }}).when(mockContext).scheduleTasks(anyList()); + + // check initialization + manager = createManager(conf, mockContext, 0.001f, 0.001f); + manager.onVertexStarted(emptyCompletions); + Assert.assertTrue(manager.bipartiteSources == 1); + + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); + + Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled + Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + + //Send an event for r1. + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); + + //Tasks should be scheduled in task 2, 0, 1 order + long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)}; + VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1); + manager.onVertexManagerEventReceived(vmEvent); //send VM event + + //stats from another vertex (more of empty stats) + sizes = new long[]{(0l), (0l), (0l)}; + vmEvent = getVertexManagerEvent(sizes, 1060000000, r1); + manager.onVertexManagerEventReceived(vmEvent); //send VM event + + //Send an event for m2. + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); + + //Send an event for m3. + manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled + Assert.assertTrue(scheduledTasks.size() == 3); + + //Order of scheduling should be 2,0,1 based on the available partition statistics + Assert.assertTrue(scheduledTasks.get(0) == 2); + Assert.assertTrue(scheduledTasks.get(1) == 0); + Assert.assertTrue(scheduledTasks.get(2) == 1); + } + + @Test(timeout = 5000) public void test_Tez1649_with_mixed_edges() { Configuration conf = new Configuration(); conf.setBoolean( http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 2e264f6..9f9cd59 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -148,7 +148,7 @@ public class TestShuffleUtils { int physicalOutputs = 10; String pathComponent = "/attempt_x_y_0/file.out"; ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent); + spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); Assert.assertTrue(events.size() == 1); Assert.assertTrue(events.get(0) instanceof CompositeDataMovementEvent); @@ -186,7 +186,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent); + spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); @@ -226,7 +226,7 @@ public class TestShuffleUtils { //normal code path where we do final merge all the time ShuffleUtils.generateEventOnSpill(events, finalMergeDisabled, isLastEvent, outputContext, - spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent); + spillId, new TezSpillRecord(indexFile, conf), physicalOutputs, true, pathComponent, null); Assert.assertTrue(events.size() == 2); //one for VM Assert.assertTrue(events.get(0) instanceof VertexManagerEvent); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 4aa53eb..129f4d1 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -19,6 +19,7 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -131,6 +132,14 @@ public class TestPipelinedSorter { } @Test + public void testWithoutPartitionStats() throws IOException { + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, false); + //# partition, # of keys, size per key, InitialMem, blockSize + basicTest(1, 0, 0, (10 * 1024l * 1024l), 3 << 20); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true); + } + + @Test public void testWithEmptyData() throws IOException { conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 5); //# partition, # of keys, size per key, InitialMem, blockSize @@ -328,6 +337,13 @@ public class TestPipelinedSorter { writeData(sorter, numKeys, keySize); + //partition stats; + boolean partitionStats = conf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_REPORT_PARTITION_STATS, TezRuntimeConfiguration + .TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT); + if (partitionStats) { + assertTrue(sorter.getPartitionStats() != null); + } verifyCounters(sorter, outputContext); Path outputFile = sorter.finalOutputFile; @@ -471,6 +487,7 @@ public class TestPipelinedSorter { (ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID); doReturn(execContext).when(outputContext).getExecutionContext(); + doReturn(mock(OutputStatisticsReporter.class)).when(outputContext).getStatisticsReporter(); doReturn(counters).when(outputContext).getCounters(); doReturn(appId).when(outputContext).getApplicationId(); doReturn(1).when(outputContext).getDAGAttemptNumber(); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index ecc44a7..4022525 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -53,6 +53,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -307,6 +308,38 @@ public class TestDefaultSorter { } } + void testPartitionStats(boolean withStats) throws IOException { + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, withStats); + OutputContext context = createTezOutputContext(); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + DefaultSorter sorter = new DefaultSorter(context, conf, 1, handler.getMemoryAssigned()); + + writeData(sorter, 1000, 10); + assertTrue(sorter.getNumSpills() == 1); + verifyCounters(sorter, context); + + if (withStats) { + assertTrue(sorter.getPartitionStats() != null); + } else { + assertTrue(sorter.getPartitionStats() == null); + } + } + + @Test(timeout = 60000) + public void testWithPartitionStats() throws IOException { + testPartitionStats(true); + } + + @Test(timeout = 60000) + public void testWithoutPartitionStats() throws IOException { + testPartitionStats(false); + } + @Test(timeout = 60000) @SuppressWarnings("unchecked") public void testWithSingleSpillWithFinalMergeDisabled() throws IOException { @@ -418,6 +451,7 @@ public class TestDefaultSorter { OutputContext context = mock(OutputContext.class); ExecutionContext execContext = new ExecutionContextImpl("localhost"); + doReturn(mock(OutputStatisticsReporter.class)).when(context).getStatisticsReporter(); doReturn(execContext).when(context).getExecutionContext(); doReturn(counters).when(context).getCounters(); doReturn(workingDirs).when(context).getWorkDirs(); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java index c72dd52..f57731c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfig.java @@ -127,6 +127,7 @@ public class TestOrderedPartitionedKVEdgeConfig { additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, "1111"); additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, "0.22f"); additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, "CustomSorter"); + additionalConfs.put(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, "true"); additionalConfs.put("file.shouldExist", "file"); OrderedPartitionedKVEdgeConfig.Builder builder = OrderedPartitionedKVEdgeConfig @@ -163,6 +164,9 @@ public class TestOrderedPartitionedKVEdgeConfig { assertEquals("io", outputConf.get("io.shouldExist")); assertEquals("file", outputConf.get("file.shouldExist")); assertEquals("fs", outputConf.get("fs.shouldExist")); + assertEquals(true, outputConf.getBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_REPORT_PARTITION_STATS, + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS_DEFAULT)); assertEquals(3, inputConf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 0)); http://git-wip-us.apache.org/repos/asf/tez/blob/62e4b6ea/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 19eb18a..8942f4b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -35,6 +35,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.OutputStatisticsReporter; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; @@ -166,6 +167,7 @@ public class TestOnFileSortedOutput { private void startSortedOutput(int partitions) throws Exception { OutputContext context = createTezOutputContext(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, true); conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); doReturn(payLoad).when(context).getUserPayload(); @@ -292,6 +294,12 @@ public class TestOnFileSortedOutput { .parseFrom( ByteString.copyFrom(((CompositeDataMovementEvent) eventList.get(1)).getUserPayload())); + ShuffleUserPayloads.VertexManagerEventPayloadProto + vmPayload = ShuffleUserPayloads.VertexManagerEventPayloadProto + .parseFrom( + ByteString.copyFrom(((VertexManagerEvent) eventList.get(0)).getUserPayload())); + + assertTrue(vmPayload.hasPartitionStats()); assertEquals(HOST, payload.getHost()); assertEquals(PORT, payload.getPort()); assertEquals(UniqueID, payload.getPathComponent());
